当前位置: 100md首页 > 医学版 > 医学资料 > 资料下载2021
编号:5595
spark大数据处理技术应用与性能优化版完整无水印.pdf
http://www.100md.com 2020年11月16日
第1页
第7页
第11页
第22页
第45页
第364页

    参见附件(7270KB,397页)。

     spark大数据处理技术应用与性能优化版

    这是一本依据*技术版本,系统、全面、详细讲解Spark的著作,作者结合自己在微软和IBM的实践经验和对Spark源代码的研究撰写而成,系统、全面、详细讲解Spark的各项功能使用、原理机制、技术细节、应用方法、性能优化,以及BDAS生态系统的相关技术

    书籍相关内容部分预览

    资料简介

    这是一本依据*技术版本,系统、全面、详细讲解Spark的著作,作者结合自己在微软和IBM的实践经验和对Spark源代码的研究撰写而成。

    首先从技术层面讲解了Spark的体系结构、工作机制、安装与部署、开发环境搭建、计算模型、Benchmark、BDAS等内容;然后从应用角度讲解了一些简单的、有代表性的案例;最后对Spark的性能优化进行了探讨。 这是一本依据*技术版本,系统、全面、详细讲解Spark的著作,作者结合自己在微软和IBM的实践经验和对Spark源代码的研究撰写而成。首先从技术层面讲解了Spark的体系结构、工作机制、安装与部署、发环境搭建、计算模型、Benchmark、BDAS等内容;然后从应用角度讲解了一些简单的、有代表性的案例;最后对Spark的性能优化行了探讨。

    spark使用范围

    Spark已经在全球范围内广泛使用,无论是Intel、Yahoo!、Twitter、阿里巴巴、百度、腾讯等国际互联网巨头,还是一些尚处于成长期的小公司,都在使用Spark。

    本书作者结合自己在微软和IBM实践Spark的经历和经验,编写了这本书。站着初学者的角度,不仅系统、全面地讲解了Spark的各项功能及其使用方法,而且较深入地探讨了Spark的工作机制、运行原理以及BDAS生态系统中的其他技术,同时还有一些可供操作的案例,能让没有经验的读者迅速掌握Spark。更为重要的是,本书还对Spark的性能优化进行了探讨。

    目录

    前 言

    第1章 Spark简介 1

    1.1 Spark是什么 1

    1.2 Spark生态系统BDAS 4

    1.3 Spark架构 6

    1.4 Spark分布式架构与单机多核

    架构的异同 9

    1.5 Spark的企业级应用 10

    1.5.1 Spark在Amazon中的应用 11

    1.5.2 Spark在Yahoo!的应用 15

    1.5.3 Spark在西班牙电信的应用 17

    1.5.4 Spark在淘宝的应用 18

    1.6 本章小结 20

    第2章 Spark集群的安装与部署 21

    2.1 Spark的安装与部署 21

    2.1.1 在Linux集群上安装与配置Spark 21

    2.1.2 在Windows上安装与配置Spark 30

    2.2 Spark集群初试 33

    2.3 本章小结 35

    第3章 Spark计算模型 36

    3.1 Spark程序模型 36

    3.2 弹性分布式数据集 37

    3.2.1 RDD简介 38

    3.2.2 RDD与分布式共享内存的异同 38

    3.2.3 Spark的数据存储 39

    3.3 Spark算子分类及功能 41

    3.3.1 Value型Transformation算子 42

    3.3.2 Key-Value型Transformation算子 49

    3.3.3 Actions算子 53

    3.4 本章小结 59

    第4章 Spark工作机制详解 60

    4.1 Spark应用执行机制 60

    4.1.1 Spark执行机制总览 60

    4.1.2 Spark应用的概念 62

    4.1.3 应用提交与执行方式 63

    4.2 Spark调度与任务分配模块 65

    4.2.1 Spark应用程序之间的调度 66

    4.2.2 Spark应用程序内Job的调度 67

    4.2.3 Stage和TaskSetManager调度方式 72

    4.2.4 Task调度 74

    4.3 Spark I/O机制 77

    4.3.1 序列化 77

    4.3.2 压缩 78

    4.3.3 Spark块管理 80

    4.4 Spark通信模块 93

    4.4.1 通信框架AKKA 94

    4.4.2 Client、Master和Worker间的通信 95

    4.5 容错机制 104

    4.5.1 Lineage机制 104

    4.5.2 Checkpoint机制 108

    4.6 Shuffle机制 110

    4.7 本章小结 119

    第5章 Spark开发环境配置及流程 120

    5.1 Spark应用开发环境配置 120

    5.1.1 使用Intellij开发Spark程序 120

    5.1.2 使用Eclipse开发Spark程序 125

    5.1.3 使用SBT构建Spark程序 129

    5.1.4 使用Spark Shell开发运行Spark程序 130

    5.2 远程调试Spark程序 130

    5.3 Spark编译 132

    5.4 配置Spark源码阅读环境 135

    5.5 本章小结 135

    第6章 Spark编程实战 136

    6.1 WordCount 136

    6.2 Top K 138

    6.3 中位数 140

    6.4 倒排索引 141

    6.5 CountOnce 143

    6.6 倾斜连接 144

    6.7 股票趋势预测 146

    6.8 本章小结 153

    第7章 Benchmark使用详解 154

    7.1 Benchmark简介 154

    7.1.1 Intel Hibench与Berkeley BigDataBench 155

    7.1.2 Hadoop GridMix 157

    7.1.3 Bigbench、BigDataBenchmark与TPC-DS 158

    7.1.4 其他Benchmark 161

    7.2 Benchmark的组成 162

    7.2.1 数据集 162

    7.2.2 工作负载 163

    7.2.3 度量指标 167

    7.3 Benchmark的使用 168

    7.3.1 使用Hibench 168

    7.3.2 使用TPC-DS 170

    7.3.3 使用BigDataBench 172

    7.4 本章小结 176

    第8章 BDAS简介 177

    8.1 SQL on Spark 177

    8.1.1 使用Spark SQL的原因 178

    8.1.2 Spark SQL架构分析 179

    8.1.3 Shark简介 182

    8.1.4 Hive on Spark 184

    8.1.5 未来展望 185

    8.2 Spark Streaming 185

    8.2.1 Spark Streaming简介 186

    8.2.2 Spark Streaming架构 188

    8.2.3 Spark Streaming原理剖析 189

    8.2.4 Spark Streaming调优 198

    8.2.5 Spark Streaming 实例 198

    8.3 GraphX 205

    8.3.1 GraphX简介 205

    8.3.2 GraphX的使用 206

    8.3.3 GraphX架构 209

    8.3.4 运行实例 211

    8.4 MLlib 215

    8.4.1 MLlib简介 217

    8.4.2 MLlib的数据存储 219

    8.4.3 数据转换为向量(向量空间模型VSM) 222

    8.4.4 MLlib中的聚类和分类 223

    8.4.5 算法应用实例 228

    8.4.6 利用MLlib进行电影推荐 230

    8.5 本章小结 237

    第9章 Spark性能调优 238

    9.1 配置参数 238

    9.2 调优技巧 239

    9.2.1 调度与分区优化 240

    9.2.2 内存存储优化 243

    9.2.3 网络传输优化 249

    9.2.4 序列化与压缩 251

    9.2.5 其他优化方法 253

    9.3 本章小结 255

    spark大数据统计计算的性能优化

    正常的大数据计算中,肯定多多少少存在性能瓶颈或者性能优化的问题,主要有一下几种情况:

    (1)通过设置spark提交参数,提大executor的数目或者executor-memory可以解决,这类问题属于第一类。

    (2)数据量过大,即使修改提交参数,也会导致内存使用超出container的内存值,而kill掉的情况。

    本文主要探讨第二种,第二种里面根据数据的分布情况又有如下几种情况,可以分开研究讨论:

    2.1 根据key进行分组的时候,key数目不是很多,但是分组后相同key的数据量非常大。

    2.2 本来key的数量就很多,即使分组,单个分组数据少,但是组过于多,(在分区没有调节之前)单个容器要处理的task也是非常多的。

    针对2.1的问题: 主要是增加组,从而保证每组的数据减少,从而避免内存溢出。

    伪代码如下,假设row的第一个值是作为聚合key使用:

    dataRdd.map(x=>((new Random).nextInt(10) + "_" + row(0).asInstanceOf[String], x))//第一步给key增加一个随机数(增加数据的分组)

    .reduceBykey((x:Row, y:Row)=>{...}) //进行第一次的聚合

    .map(x=>(x._1.split("_")(1), x._2)) //去掉随机数前缀

    .redubceBykey((x:Row, y:Row)=>{...}) //进行第二次聚合

    针对2.2的数据情况,如果数据字段都是基本类型、string类型的话,建议采用reduceBykey而不要采用groupBykey(减少网络传输开销)。

    如果存在复杂字段Array[Struct[]] 类似复杂组合类型字段的时候,进行reduceBykey时,往往行与行之间的转换存在困难,也只能采用groupByKey.

    如何优化这类问题,目前只能通过增加聚合的分区数进行解决(当然你可以同时增加资源)

    groupByKey(key, numPartions)

    reduceByKey(key, numPartions)

    通过增加numPartions 分区数,从而使得每个task处理分区的数据减少,避免内存溢出,超过container指定的大小。

    备注:特别是使用spark-sql的项目,一定要注意sql的性能,不要引起全表扫描,这个也比较消耗内存。

    如何查看是否引起全表扫描,可以查看sql的执行计划

    在hive-shell的命令窗口中:

    explain select * from xxx ...;

    通过查看行数就可以定义。

    spark大数据处理技术应用与性能优化版截图

    大数据技术丛书

    Spark大数据处理:技术、应用与性能优化

    高彦杰 著

    ISBN:978-7-111-48386-1

    本书纸版由机械工业出版社于2014年出版,电子版由华章分社(北京华章图文信息有限公

    司,北京奥维博世图书发行有限公司)全球范围内制作与发行。

    版权所有,侵权必究

    客服热线:+ 86-10-68995265

    客服信箱:service@bbbvip.com

    官方网址:www.hzmedia.com.cn

    新浪微博 @研发书局

    腾讯微博 @yanfabook目录

    前言

    第1章 Spark简介

    1.1 Spark是什么

    1.2 Spark生态系统BDAS

    1.3 Spark架构

    1.4 Spark分布式架构与单机多核架构的异同

    1.5 Spark的企业级应用

    1.5.1 Spark在Amazon中的应用

    1.5.2 Spark在Yahoo!的应用

    1.5.3 Spark在西班牙电信的应用

    1.5.4 Spark在淘宝的应用

    1.6 本章小结

    第2章 Spark集群的安装与部署

    2.1 Spark的安装与部署

    2.1.1 在Linux集群上安装与配置Spark

    2.1.2 在Windows上安装与配置Spark

    2.2 Spark集群初试

    2.3 本章小结

    第3章 Spark计算模型

    3.1 Spark程序模型

    3.2 弹性分布式数据集

    3.2.1 RDD简介

    3.2.2 RDD与分布式共享内存的异同

    3.2.3 Spark的数据存储3.3 Spark算子分类及功能

    3.3.1 Value型Transformation算子

    3.3.2 Key-Value型Transformation算子

    3.3.3 Actions算子

    3.4 本章小结

    第4章 Spark工作机制详解

    4.1 Spark应用执行机制

    4.1.1 Spark执行机制总览

    4.1.2 Spark应用的概念

    4.1.3 应用提交与执行方式

    4.2 Spark调度与任务分配模块

    4.2.1 Spark应用程序之间的调度

    4.2.2 Spark应用程序内Job的调度

    4.2.3 Stage和TaskSetManager调度方式

    4.2.4 Task调度

    4.3 Spark IO机制

    4.3.1 序列化

    4.3.2 压缩

    4.3.3 Spark块管理

    4.4 Spark通信模块

    4.4.1 通信框架AKKA

    4.4.2 Client、Master和Worker间的通信

    4.5 容错机制

    4.5.1 Lineage机制

    4.5.2 Checkpoint机制

    4.6 Shuffle机制4.7 本章小结

    第5章 Spark开发环境配置及流程

    5.1 Spark应用开发环境配置

    5.1.1 使用Intellij开发Spark程序

    5.1.2 使用Eclipse开发Spark程序

    5.1.3 使用SBT构建Spark程序

    5.1.4 使用Spark Shell开发运行Spark程序

    5.2 远程调试Spark程序

    5.3 Spark编译

    5.4 配置Spark源码阅读环境

    5.5 本章小结

    第6章 Spark编程实战

    6.1 WordCount

    6.2 Top K

    6.3 中位数

    6.4 倒排索引

    6.5 CountOnce

    6.6 倾斜连接

    6.7 股票趋势预测

    6.8 本章小结

    第7章 Benchmark使用详解

    7.1 Benchmark简介

    7.1.1 Intel Hibench与Berkeley BigDataBench

    7.1.2 Hadoop GridMix

    7.1.3 Bigbench、BigDataBenchmark与TPC-DS

    7.1.4 其他Benchmark7.2 Benchmark的组成

    7.2.1 数据集

    7.2.2 工作负载

    7.2.3 度量指标

    7.3 Benchmark的使用

    7.3.1 使用Hibench

    7.3.2 使用TPC-DS

    7.3.3 使用BigDataBench

    7.4 本章小结

    第8章 BDAS简介

    8.1 SQL on Spark

    8.1.1 使用Spark SQL的原因

    8.1.2 Spark SQL架构分析

    8.1.3 Shark简介

    8.1.4 Hive on Spark

    8.1.5 未来展望

    8.2 Spark Streaming

    8.2.1 Spark Streaming简介

    8.2.2 Spark Streaming架构

    8.2.3 Spark Streaming原理剖析

    8.2.4 Spark Streaming调优

    8.2.5 Spark Streaming实例

    8.3 GraphX

    8.3.1 GraphX简介

    8.3.2 GraphX的使用

    8.3.3 GraphX架构8.3.4 运行实例

    8.4 MLlib

    8.4.1 MLlib简介

    8.4.2 MLlib的数据存储

    8.4.3 数据转换为向量(向量空间模型VSM)

    8.4.4 MLlib中的聚类和分类

    8.4.5 算法应用实例

    8.4.6 利用MLlib进行电影推荐

    8.5 本章小结

    第9章 Spark性能调优

    9.1 配置参数

    9.2 调优技巧

    9.2.1 调度与分区优化

    9.2.2 内存存储优化

    9.2.3 网络传输优化

    9.2.4 序列化与压缩

    9.2.5 其他优化方法

    9.3 本章小结前言

    Spark是发源于美国加州大学伯克利分校AMPLab的大数据分析平台,它立足于内存计

    算,从多迭代批量处理出发,兼顾数据仓库、流处理和图计算等多种计算范式,是大数据系

    统领域的全栈计算平台。Spark当下已成为Apache基金会的顶级开源项目,拥有庞大的社区

    支持,技术也逐渐走向成熟。

    为什么要写这本书

    大数据还在如火如荼地发展着,突然之间,Spark就火了。还记得最开始接触Spark技术

    时资料匮乏,只有官方文档和源码可以作为研究学习的资料。写一本Spark系统方面的技术

    书籍,是我持续了很久的一个想法。由于学习和工作较为紧张,最初只是通过几篇笔记在博

    客中分享自己学习Spark过程的点滴,但是随着时间的推移,笔记不断增多,最终还是打算

    将笔记整理成书,也算是一个总结和分享。

    在国外Yahoo!、Intel、Amazon、Cloudera等公司率先应用并推广Spark技术,在国内

    淘宝、腾讯、网易、星环等公司敢为人先,并乐于分享。在随后的发展中,IBM、MapR、Hortonworks、微策略等公司纷纷将Spark融进现有解决方案,并加入Spark阵营。Spark在工

    业界的应用也呈星火燎原之势。

    随着Spark技术在国内的大范围落地、Spark中国峰会的召开,及各地meetup的火爆举

    行,开源软件Spark也因此水涨船高。随着大数据相关技术和产业的逐渐成熟,公司生产环

    境往往需要同时进行多种类型的大数据分析作业:批处理、各种机器学习、流式计算、图计

    算、SQL查询等。在Spark出现前,要在一个平台内同时完成以上数种大数据分析任务,就

    不得不与多套独立的系统打交道,这需要系统间进行代价较大的数据转储,但是这无疑会增

    加运维负担。在1年之前,关注Spark的人和公司不多,由于它包含的软件种类多,版本升级较快,技

    术较为新颖,初学者难以在有限的时间内快速掌握Spark蕴含的价值。同时国内缺少一本实

    践与理论相结合的Spark书籍,很多Spark初学者和开发人员只能参考网络上零星的Spark技

    术相关博客,自己一点一滴地阅读源码和文档,缓慢地学习Spark。本书也正是为了解决上

    面的问题而编写的。

    本书从一个系统化的视角,秉承大道至简的主导思想,介绍Spark中最值得关注的内

    容,讲解Spark部署、开发实战,并结合Spark的运行机制及拓展,帮读者开启Spark技术之

    旅。

    本书特色

    本书是国内首本系统讲解Spark编程实战的书籍,涵盖Spark技术的方方面面。

    1)对Spark的架构、运行机制、系统环境搭建、测试和调优进行深入讲解,以期让读者

    知其所以然。讲述Spark最核心的技术内容,以激发读者的联想,进而衍化至繁。

    2)实战部分不但给出编程示例,还给出可拓展的应用场景。

    3)剖析BDAS生态系统的主要组件的原理和应用,让读者充分了解Spark生态系统。

    本书的理论和实战安排得当,突破传统讲解方式,使读者读而不厌。

    本书中一些讲解实操部署和示例的章节,比较适合作为运维和开发人员工作时手边的

    书;运行机制深入分析方面的章节,比较适合架构师和Spark研究人员,可帮他们拓展解决

    问题的思路。

    读者对象·Spark初学者

    ·Spark二次开发人员

    ·Spark应用开发人员

    ·Spark运维工程师

    ·开源软件爱好者

    ·其他对大数据技术感兴趣的人员

    如何阅读本书

    本书分为两大部分,共计9章内容。

    第1章 从Spark概念出发,介绍了Spark的来龙去脉,阐述Spark生态系统全貌。

    第2章 详细介绍了Spark在Linux集群和Windows上如何进行部署和安装。

    第3章 详细介绍了Spark的计算模型,RDD的概念与原理,RDD上的函数算子的原理和

    使用,广播和累加变量。

    第4章 详细介绍了Spark应用执行机制、Spark调度与任务分配、Spark IO机制、Spark通信模块、容错机制、Shuffle机制,并对Spark机制原理进行了深入剖析。

    第5章 从实际出发,详细介绍了如何在Intellij、Eclipse中配置开发环境,如何使用

    SBT构建项目,如何使用SparkShell进行交互式分析、远程调试和编译Spark源码,以及如何

    构建Spark源码阅读环境。

    第6章 由浅入深,详细介绍了Spark的编程案例,通过WordCount、Top K到倾斜连接等,以帮助读者快速掌握开发Spark程序的技巧。

    第7章 展开介绍了主流的大数据Benchmark的原理,并对比了Benchmark优劣势,指

    导Spark系统性能测试和性能问题诊断。

    第8章 围绕Spark生态系统,介绍了Spark之上的SQL on Spark、Spark Streaming、GraphX、MLlib的原理和应用。

    第9章 详细介绍了如何对Spark进行性能调优,以及调优方法的原理。

    如果您是一位有着一定经验的资深开发人员,能够理解Spark的相关基础知识和使用技

    巧,那么可以直接阅读4章、7章、8章、9章。如果你是一名初学者,请一定从第1章的基础

    知识开始学起。

    勘误和支持

    由于笔者的水平有限,编写时间仓促,书中难免会出现一些错误或者不准确的地方,恳

    请读者批评指正。如果您有更多的宝贵意见,欢迎访问我的个人Github上的Spark大数据处

    理专版:https:github.comYanjieGaoSparkInAction,您可以将书中的错误提交PR或者进

    行评论,我会尽量在线上为读者提供最满意的解答。您也可以通过微博@高彦杰gyj、微信

    公共号@Spark大数据、博客http:blog.csdn.netgaoyanjie55或者邮箱

    gaoyanjie55@163.com联系到我,期待能够得到读者朋友们的真挚反馈,在技术之路上互勉

    共进。

    致谢

    感谢中国人民大学的杜小勇老师、何军老师、陈跃国老师,是老师们将我带进大数据技

    术领域,教授我专业知识与学习方法,并在每一次迷茫时给予我鼓励与支持。感谢微软亚洲研究院的闫莺老师和其他老师及同事,在实习工作中给予我的帮助和指

    导。

    感谢IBM中国研究院的陈冠诚老师和其他老师及同事,在实习工作中给予我的帮助和指

    导。

    感谢连城、明风、Daoyuan Wang等大牛以及Michael Armbrust、Reynold Xin、Sean

    Owen等多位社区大牛,在开发和技术学习中对我的点拨和指导,以及社区的各位技术专家

    们的博客文章。本书多处引用了他们的观点和思想。

    感谢机械工业出版社华章公司的首席策划杨福川和编辑高婧雅,在近半年的时间中始终

    支持我的写作,给我鼓励和帮助,引导我顺利完成全部书稿。

    特别致谢

    最后,我要特别感谢我的女友蒋丹彤对我的支持,我为写作这本书,牺牲了很多陪伴你

    的时间。同时也要感谢你花了很大的精力帮助我进行书稿校对。正因为有了你的付出与支

    持,我才能坚持写下去。

    感谢我的父母、姐姐,有了你们的帮助和支持,我才有时间和精力去完成全部写作。

    谨以此书献给我最亲爱的家人,以及众多热爱大数据技术的朋友们!第1章 Spark简介

    本章主要介绍Spark大数据计算框架、架构、计算模型和数据管理策略及Spark在工业界

    的应用。围绕Spark的BDAS项目及其子项目进行了简要介绍。目前,Spark生态系统已经发

    展成为一个包含多个子项目的集合,其中包含SparkSQL、Spark Streaming、GraphX、MLlib

    等子项目,本章只进行简要介绍,后续章节再详细阐述。1.1 Spark是什么

    Spark是基于内存计算的大数据并行计算框架。Spark基于内存计算,提高了在大数据环

    境下数据处理的实时性,同时保证了高容错性和高可伸缩性,允许用户将Spark部署在大量

    廉价硬件之上,形成集群。

    Spark于2009年诞生于加州大学伯克利分校AMPLab。目前,已经成为Apache软件基金

    会旗下的顶级开源项目。下面是Spark的发展历程。

    1.Spark的历史与发展

    ·2009年:Spark诞生于AMPLab。

    ·2010年:开源。

    ·2013年6月:Apache孵化器项目。

    ·2014年2月:Apache顶级项目。

    ·2014年2月:大数据公司Cloudera宣称加大Spark框架的投入来取代MapReduce。

    ·2014年4月:大数据公司MapR投入Spark阵营,Apache Mahout放弃MapReduce,将使

    用Spark作为计算引擎。

    ·2014年5月:Pivotal Hadoop集成Spark全栈。

    ·2014年5月30日:Spark 1.0.0发布。

    ·2014年6月:Spark 2014峰会在旧金山召开。

    ·2014年7月:Hive on Spark项目启动。目前AMPLab和Databricks负责整个项目的开发维护,很多公司,如Yahoo!、Intel等参

    与到Spark的开发中,同时很多开源爱好者积极参与Spark的更新与维护。

    AMPLab开发以Spark为核心的BDAS时提出的目标是:one stack to rule them all,也就

    是说在一套软件栈内完成各种大数据分析任务。相对于MapReduce上的批量计算、迭代型计

    算以及基于Hive的SQL查询,Spark可以带来上百倍的性能提升。目前Spark的生态系统日趋

    完善,Spark SQL的发布、Hive on Spark项目的启动以及大量大数据公司对Spark全栈的支

    持,让Spark的数据分析范式更加丰富。

    2.Spark之于Hadoop

    更准确地说,Spark是一个计算框架,而Hadoop中包含计算框架MapReduce和分布式文

    件系统HDFS,Hadoop更广泛地说还包括在其生态系统上的其他系统,如Hbase、Hive等。

    Spark是MapReduce的替代方案,而且兼容HDFS、Hive等分布式存储层,可融入

    Hadoop的生态系统,以弥补缺失MapReduce的不足。

    Spark相比Hadoop MapReduce的优势[1]

    如下。

    (1)中间结果输出

    基于MapReduce的计算引擎通常会将中间结果输出到磁盘上,进行存储和容错。出于任

    务管道承接的考虑,当一些查询翻译到MapReduce任务时,往往会产生多个Stage,而这些

    串联的Stage又依赖于底层文件系统(如HDFS)来存储每一个Stage的输出结果。

    Spark将执行模型抽象为通用的有向无环图执行计划(DAG),这可以将多Stage的任务

    串联或者并行执行,而无须将Stage中间结果输出到HDFS中。类似的引擎包括Dryad、Tez。

    (2)数据格式和内存布局由于MapReduce Schema on Read处理方式会引起较大的处理开销。Spark抽象出分布

    式内存存储结构弹性分布式数据集RDD,进行数据的存储。RDD能支持粗粒度写操作,但对

    于读取操作,RDD可以精确到每条记录,这使得RDD可以用来作为分布式索引。Spark的特

    性是能够控制数据在不同节点上的分区,用户可以自定义分区策略,如Hash分区等。Shark

    和Spark SQL在Spark的基础之上实现了列存储和列存储压缩。

    (3)执行策略

    MapReduce在数据Shuffle之前花费了大量的时间来排序,Spark则可减轻上述问题带来

    的开销。因为Spark任务在Shuffle中不是所有情景都需要排序,所以支持基于Hash的分布式

    聚合,调度中采用更为通用的任务执行计划图(DAG),每一轮次的输出结果在内存缓存。

    (4)任务调度的开销

    传统的MapReduce系统,如Hadoop,是为了运行长达数小时的批量作业而设计的,在

    某些极端情况下,提交一个任务的延迟非常高。

    Spark采用了事件驱动的类库AKKA来启动任务,通过线程池复用线程来避免进程或线程

    启动和切换开销。

    3.Spark能带来什么

    Spark的一站式解决方案有很多的优势,具体如下。

    (1)打造全栈多计算范式的高效数据流水线

    Spark支持复杂查询。在简单的“map”及“reduce”操作之外,Spark还支持SQL查询、流

    式计算、机器学习和图算法。同时,用户可以在同一个工作流中无缝搭配这些计算范式。

    (2)轻量级快速处理Spark 1.0核心代码只有4万行。这是由于Scala语言的简洁和丰富的表达力,以及Spark

    充分利用和集成Hadoop等其他第三方组件,同时着眼于大数据处理,数据处理速度是至关

    重要的,Spark通过将中间结果缓存在内存减少磁盘IO来达到性能的提升。

    (3)易于使用,Spark支持多语言

    Spark支持通过Scala、Java及Python编写程序,这允许开发者在自己熟悉的语言环境下

    进行工作。它自带了80多个算子,同时允许在Shell中进行交互式计算。用户可以利用Spark

    像书写单机程序一样书写分布式程序,轻松利用Spark搭建大数据内存计算平台并充分利用

    内存计算,实现海量数据的实时处理。

    (4)与HDFS等存储层兼容

    Spark可以独立运行,除了可以运行在当下的YARN等集群管理系统之外,它还可以读取

    已有的任何Hadoop数据。这是个非常大的优势,它可以运行在任何Hadoop数据源上,如

    Hive、HBase、HDFS等。这个特性让用户可以轻易迁移已有的持久化层数据。

    (5)社区活跃度高

    Spark起源于2009年,当下已有超过50个机构、260个工程师贡献过代码。开源系统的

    发展不应只看一时之快,更重要的是支持一个活跃的社区和强大的生态系统。

    同时我们也应该看到Spark并不是完美的,RDD模型适合的是粗粒度的全局数据并行计

    算。不适合细粒度的、需要异步更新的计算。对于一些计算需求,如果要针对特定工作负载

    达到最优性能,还是需要使用一些其他的大数据系统。例如,图计算领域的GraphLab在特

    定计算负载性能上优于GraphX,流计算中的Storm在实时性要求很高的场合要比Spark

    Streaming更胜一筹。

    随着Spark发展势头日趋迅猛,它已被广泛应用于Yahoo!、Twitter、阿里巴巴、百

    度、网易、英特尔等各大公司的生产环境中。[1] 参见论文:Reynold Shi Xin,Joshua Rosen,Matei Zaharia,Michael Franklin,Scott

    Shenker,Ion Stoica Shark:SQL and Rich Analytics at Scale。1.2 Spark生态系统BDAS

    目前,Spark已经发展成为包含众多子项目的大数据计算平台。伯克利将Spark的整个生

    态系统称为伯克利数据分析栈(BDAS)。其核心框架是Spark,同时BDAS涵盖支持结构化

    数据SQL查询与分析的查询引擎Spark SQL和Shark,提供机器学习功能的系统MLbase及底层

    的分布式机器学习库MLlib、并行图计算框架GraphX、流计算框架Spark Streaming、采样近

    似计算查询引擎BlinkDB、内存分布式文件系统Tachyon、资源管理框架Mesos等子项目。这

    些子项目在Spark上层提供了更高层、更丰富的计算范式。

    图1-1为BDAS的项目结构图。

    图1-1 伯克利数据分析栈(BDAS)项目结构图

    下面对BDAS的各个子项目进行更详细的介绍。

    (1)Spark

    Spark是整个BDAS的核心组件,是一个大数据分布式编程框架,不仅实现了MapReduce

    的算子map函数和reduce函数及计算模型,还提供更为丰富的算子,如filter、join、groupByKey等。Spark将分布式数据抽象为弹性分布式数据集(RDD),实现了应用任务调

    度、RPC、序列化和压缩,并为运行在其上的上层组件提供API。其底层采用Scala这种函数式语言书写而成,并且所提供的API深度借鉴Scala函数式的编程思想,提供与Scala类似的

    编程接口。

    图1-2为Spark的处理流程(主要对象为RDD)。

    图1-2 Spark的任务处理流程图

    Spark将数据在分布式环境下分区,然后将作业转化为有向无环图(DAG),并分阶段

    进行DAG的调度和任务的分布式并行处理。

    (2)Shark

    Shark是构建在Spark和Hive基础之上的数据仓库。目前,Shark已经完成学术使命,终

    止开发,但其架构和原理仍具有借鉴意义。它提供了能够查询Hive中所存储数据的一套SQL

    接口,兼容现有的Hive QL语法。这样,熟悉Hive QL或者SQL的用户可以基于Shark进行快

    速的Ad-Hoc、Reporting等类型的SQL查询。Shark底层复用Hive的解析器、优化器以及元数

    据存储和序列化接口。Shark会将Hive QL编译转化为一组Spark任务,进行分布式运算。

    (3)Spark SQL

    Spark SQL提供在大数据上的SQL查询功能,类似于Shark在整个生态系统的角色,它们可以统称为SQL on Spark。之前,Shark的查询编译和优化器依赖于Hive,使得Shark不得不

    维护一套Hive分支,而Spark SQL使用Catalyst做查询解析和优化器,并在底层使用Spark作

    为执行引擎实现SQL的Operator。用户可以在Spark上直接书写SQL,相当于为Spark扩充了

    一套SQL算子,这无疑更加丰富了Spark的算子和功能,同时Spark SQL不断兼容不同的持久

    化存储(如HDFS、Hive等),为其发展奠定广阔的空间。

    (4)Spark Streaming

    Spark Streaming通过将流数据按指定时间片累积为RDD,然后将每个RDD进行批处

    理,进而实现大规模的流数据处理。其吞吐量能够超越现有主流流处理框架Storm,并提供

    丰富的API用于流数据计算。

    (5)GraphX

    GraphX基于BSP模型,在Spark之上封装类似Pregel的接口,进行大规模同步全局的图

    计算,尤其是当用户进行多轮迭代时,基于Spark内存计算的优势尤为明显。

    (6)Tachyon

    Tachyon是一个分布式内存文件系统,可以理解为内存中的HDFS。为了提供更高的性

    能,将数据存储剥离Java Heap。用户可以基于Tachyon实现RDD或者文件的跨应用共享,并提供高容错机制,保证数据的可靠性。

    (7)Mesos

    Mesos是一个资源管理框架[1]

    ,提供类似于YARN的功能。用户可以在其中插件式地运

    行Spark、MapReduce、Tez等计算框架的任务。Mesos会对资源和任务进行隔离,并实现高

    效的资源任务调度。

    (8)BlinkDBBlinkDB是一个用于在海量数据上进行交互式SQL的近似查询引擎。它允许用户通过在

    查询准确性和查询响应时间之间做出权衡,完成近似查询。其数据的精度被控制在允许的误

    差范围内。为了达到这个目标,BlinkDB的核心思想是:通过一个自适应优化框架,随着时

    间的推移,从原始数据建立并维护一组多维样本;通过一个动态样本选择策略,选择一个适

    当大小的示例,然后基于查询的准确性和响应时间满足用户查询需求。

    [1] Spark自带的资源管理框架是Standalone。1.3 Spark架构

    从上文介绍可以看出,Spark是整个BDAS的核心。生态系统中的各个组件通过Spark来

    实现对分布式并行任务处理的程序支持。

    1.Spark的代码结构

    图1-3展示了Spark-1.0的代码结构和代码量(不包含Test和Sample代码),读者可以通

    过代码架构对Spark的整体组件有一个初步了解,正是这些代码模块构成了Spark架构中的各

    个组件,同时读者可以通过代码模块的脉络阅读与剖析源码,这对于了解Spark的架构和实

    现细节都是很有帮助的。

    下面对图1-3中的各模块进行简要介绍。

    scheduler:文件夹中含有负责整体的Spark应用、任务调度的代码。

    broadcast:含有Broadcast(广播变量)的实现代码,API中是Java和Python API的实

    现。图1-3 Spark代码结构和代码量

    deploy:含有Spark部署与启动运行的代码。

    common:不是一个文件夹,而是代表Spark通用的类和逻辑实现,有5000行代码。

    metrics:是运行时状态监控逻辑代码,Executor中含有Worker节点负责计算的逻辑代

    码。

    partial:含有近似评估代码。

    network:含有集群通信模块代码。

    serializer:含有序列化模块的代码。

    storage:含有存储模块的代码。

    ui:含有监控界面的代码逻辑。其他的代码模块分别是对Spark生态系统中其他组件的

    实现。streaming:是Spark Streaming的实现代码。

    YARN:是Spark on YARN的部分实现代码。

    graphx:含有GraphX实现代码。

    interpreter:代码交互式Shell的代码量为3300行。

    mllib:代表MLlib算法实现的代码量。

    sql代表Spark SQL的代码量。

    2.Spark的架构

    Spark架构采用了分布式计算中的Master-Slave模型。Master是对应集群中的含有

    Master进程的节点,Slave是集群中含有Worker进程的节点。Master作为整个集群的控制

    器,负责整个集群的正常运行;Worker相当于是计算节点,接收主节点命令与进行状态汇

    报;Executor负责任务的执行;Client作为用户的客户端负责提交应用,Driver负责控制一个

    应用的执行,如图1-4所示。

    图1-4 Spark架构图

    Spark集群部署后,需要在主节点和从节点分别启动Master进程和Worker进程,对整个集群进行控制。在一个Spark应用的执行过程中,Driver和Worker是两个重要角色。Driver程

    序是应用逻辑执行的起点,负责作业的调度,即Task任务的分发,而多个Worker用来管理

    计算节点和创建Executor并行处理任务。在执行阶段,Driver会将Task和Task所依赖的file和

    jar序列化后传递给对应的Worker机器,同时Executor对相应数据分区的任务进行处理。

    下面详细介绍Spark的架构中的基本组件。

    ·ClusterManager:在Standalone模式中即为Master(主节点),控制整个集群,监控

    Worker。在YARN模式中为资源管理器。

    ·Worker:从节点,负责控制计算节点,启动Executor或Driver。在YARN模式中为

    NodeManager,负责计算节点的控制。

    ·Driver:运行Application的main函数并创建SparkContext。

    ·Executor:执行器,在worker node上执行任务的组件、用于启动线程池运行任务。每

    个Application拥有独立的一组Executors。

    ·SparkContext:整个应用的上下文,控制应用的生命周期。

    ·RDD:Spark的基本计算单元,一组RDD可形成执行的有向无环图RDD Graph。

    ·DAG Scheduler:根据作业(Job)构建基于Stage的DAG,并提交Stage给

    TaskScheduler。

    ·TaskScheduler:将任务(Task)分发给Executor执行。

    ·SparkEnv:线程级别的上下文,存储运行时的重要组件的引用。

    SparkEnv内创建并包含如下一些重要组件的引用。

    ·MapOutPutTracker:负责Shuffle元信息的存储。·BroadcastManager:负责广播变量的控制与元信息的存储。

    ·BlockManager:负责存储管理、创建和查找块。

    ·MetricsSystem:监控运行时性能指标信息。

    ·SparkConf:负责存储配置信息。

    Spark的整体流程为:Client提交应用,Master找到一个Worker启动Driver,Driver向

    Master或者资源管理器申请资源,之后将应用转化为RDD Graph,再由DAGScheduler将

    RDD Graph转化为Stage的有向无环图提交给TaskScheduler,由TaskScheduler提交任务给

    Executor执行。在任务执行的过程中,其他组件协同工作,确保整个应用顺利执行。

    3.Spark运行逻辑

    如图1-5所示,在Spark应用中,整个执行流程在逻辑上会形成有向无环图(DAG)。

    Action算子触发之后,将所有累积的算子形成一个有向无环图,然后由调度器调度该图上的

    任务进行运算。Spark的调度方式与MapReduce有所不同。Spark根据RDD之间不同的依赖关

    系切分形成不同的阶段(Stage),一个阶段包含一系列函数执行流水线。图中的A、B、C、D、E、F分别代表不同的RDD,RDD内的方框代表分区。数据从HDFS输入Spark,形成

    RDD A和RDD C,RDD C上执行map操作,转换为RDD D,RDD B和RDD E执行join操作,转

    换为F,而在B和E连接转化为F的过程中又会执行Shuffle,最后RDD F通过函数

    saveAsSequenceFile输出并保存到HDFS中。图1-5 Spark执行有向无环图1.4 Spark分布式架构与单机多核架构的异同

    我们通常所说的分布式系统主要指的是分布式软件系统,它是在通信网络互连的多处理

    机的架构上执行任务的软件系统,包括分布式操作系统、分布式程序设计语言、分布式文件

    系统和分布式数据库系统等。Spark是分布式软件系统中的分布式计算框架,基于Spark可以

    编写分布式计算程序和软件。为了整体宏观把握和理解分布式系统,可以将一个集群视为一

    台计算机。分布式计算框架的最终目的是方便用户编程,最后达到像原来编写单机程序一样

    编写分布式程序。但是分布式编程与编写单机程序还是存在不同点的。由于分布式架构和单

    机的架构有所不同,存在内存和磁盘的共享问题,这也是我们在书写和优化程序的过程中需

    要注意的地方。分布式架构与单机架构的对比如图1-6所示。图1-6 分布式体系结构与单机体系结构的对比

    1)在单机多核环境下,多CPU共享内存和磁盘。当系统所需的计算和存储资源不够,需要扩展CPU和存储时,单机多核系统显得力不从心。

    2)大规模分布式并行处理系统是由许多松耦合的处理单元组成的,要注意的是,这里

    指的是处理单元而非处理器。每个单元内的CPU都有自己私有的资源,如总线、内存、硬盘

    等。这种结构最大的特点在于不共享资源。在不共享资源(Share Nothing)的分布式架构

    下,节点可以实现无限扩展,即计算能力和存储的扩展性可以成倍增长。

    在分布式运算下,数据尽量本地运算,减少网络IO开销。由于大规模分布式系统要在不同处理单元之间传送信息,在网络传输少时,系统可以充分发挥资源的优势,达到高效

    率。也就是说,如果操作相互之间没有什么关系,处理单元之间需要进行的通信比较少,则

    采用分布式系统更好。因此,分布式系统在决策支持(DSS)和数据挖掘(Data Mining)方

    面具有优势。

    Spark正是基于大规模分布式并行架构开发,因此能够按需进行计算能力与存储能力的

    扩展,在应对大数据挑战时显得游刃有余,同时保证容错性,让用户放心地进行大数据分

    析。1.5 Spark的企业级应用

    随着企业数据量的增长,对大数据的处理和分析已经成为企业的迫切需求。Spark作为

    Hadoop的替代者,引起学术界和工业界的普遍兴趣,大量应用在工业界落地,许多科研院

    校开始了对Spark的研究。

    在学术界,Spark得到各院校的关注。Spark源自学术界,最初是由加州大学伯克利分校

    的AMPLab设计开发。国内的中科院、中国人民大学、南京大学、华东师范大学等也开始对

    Spark展开相关研究。涉及Benchmark、SQL、并行算法、性能优化、高可用性等多个方

    面。

    在工业界,Spark已经在互联网领域得到广泛应用。互联网用户群体庞大,需要存储大

    数据并进行数据分析,Spark能够支持多范式的数据分析,解决了大数据分析中迫在眉睫的

    问题。例如,国外Cloudera、MapR等大数据厂商全面支持Spark,微策略等老牌BI厂商也和

    Databricks达成合作关系,Yahoo!使用Spark进行日志分析并积极回馈社区,Amazon在云

    端使用Spark进行分析。国内同样得到很多公司的青睐,淘宝构建Spark on Yarn进行用户交

    易数据分析,使用GraphX进行图谱分析。网易用Spark和Shark对海量数据进行报表和查

    询。腾讯使用Spark进行精准广告推荐。

    下面将选取代表性的Spark应用案例进行分析,以便于读者了解Spark在工业界的应用状

    况。1.5.1 Spark在Amazon中的应用

    亚马逊云计算服务AWS(Amazon Web Services)提供IaaS和PaaS服务。Heroku、Netflix等众多知名公司都将自己的服务托管其上。AWS以Web服务的形式向企业提供IT基础

    设施服务,现在通常称为云计算。云计算的主要优势是能够根据业务发展扩展的较低可变成

    本替代前期资本基础设施费用。利用云,企业无须提前数周或数月来计划和采购服务器及其

    他IT基础设施,即可在几分钟内即时运行成百上千台服务器,并更快达成结果。

    1.亚马逊AWS云服务的内容

    目前亚马逊在EMR中提供了弹性Spark服务,用户可以按需动态分配Spark集群计算节

    点,随着数据规模的增长,扩展自己的Spark数据分析集群,同时在云端的Spark集群可以无

    缝集成亚马逊云端的其他组件,一起构建数据分析流水线。

    亚马逊云计算服务AWS提供的服务包括:亚马逊弹性计算云(Amazon EC2)、亚马逊

    简单存储服务(Amazon S3)、亚马逊弹性MapReduce(Amazon EMR)、亚马逊简单数据

    库(Amazon SimpleDB)、亚马逊简单队列服务(Amazon Simple Queue Service)、Amazon DynamoDB以及Amazon CloudFront等。基于以上的组件,亚马逊开始提供EMR上

    的弹性Spark服务。用户可以像之前使用EMR一样在亚马逊动态申请计算节点,可随着数据

    量和计算需求来动态扩展计算资源,将计算能力水平扩展,按需进行大数据分析。亚马逊提

    供的云服务中已经支持使用Spark集群进行大数据分析。数据可以存储在S3或者Hadoop存储

    层,通过Spark将数据加载进计算集群进行复杂的数据分析。

    亚马逊AWS架构如图1-7所示。图1-7 亚马逊AWS架构

    2.亚马逊的EMR中提供的3种主要组件

    ·Master Node:主节点,负责整体的集群调度与元数据存储。

    ·Core Node:Hadoop节点,负责数据的持久化存储,可以动态扩展资源,如更多的

    CPU Core、更大的内存、更大的HDFS存储空间。为了防止HDFS损坏,不能移除Core

    Node。

    ·Task Node:Spark计算节点,负责执行数据分析任务,不提供HDFS,只负责提供计算

    资源(CPU和内存),可以动态扩展资源,可以增加和移除Task Node。

    3.使用Spark on Amazon EMR的优势

    ·构建速度快:可以在几分钟内构建小规模或者大规模Spark集群,以进行数据分析。

    ·运维成本低:EMR负责整个集群的管理与控制,EMR也会负责失效节点的恢复。·云生态系统数据处理组件丰富:Spark集群可以很好地与Amazon云服务上的其他组件

    无缝集成,利用其他组件构建数据分析管道。例如,Spark可以和EC2 Spot Market、Amazon Redshift、Amazon Data pipeline、Amazon CloudWatch等组合使用。

    ·方便调试:Spark集群的日志可以直接存储到Amazon S3中,方便用户进行日志分析。

    综合以上优势,用户可以真正按需弹性使用与分配计算资源,实现节省计算成本、减轻

    运维压力,在几分钟内构建自己的大数据分析平台。

    4.Spark on Amazon EMR架构解析

    通过图1-8可以看到整个Spark on Amazon EMR的集群架构。下面以图1-8为例,分析用

    户如何在应用场景使用服务。

    图1-8 Amazon Spark on EMR

    构建集群,首先创建一个Master Node作为集群的主节点。之后创建两个Core Node存

    储数据,两个Core Node总共有32GB的内存。但是这些内存是不够Spark进行内存计算的。接下来动态申请16个Task Node,总共256GB内存作为计算节点,进行Spark的数据分析。

    当用户开始分析数据时,Spark RDD的输入既可以来自Core Node中的HDFS,也可以来

    自Amazon S3,还可以通过输入数据创建RDD。用户在RDD上进行各种计算范式的数据分

    析,最终可以将分析结果输出到Core Node的HDFS中,也可以输出到Amazon S3中。

    5.应用案例:构建1000个节点的Spark集群

    读者可以通过下面的步骤,在Amazon EMR上构建自己的1000个节点的Spark数据分析

    平台。

    1)启动1000个节点的集群,这个过程将会花费10~20分钟。

    .elas2c-mapreduce --create –alive--name SparkShark Cluster \--bootstrap-ac2on

    s3:elasBcmapreducesamplesspark0.8.1install-spark-shark.sh--bootstrap-name SparkShark--instance-type m1.xlarge--instance-count 1000

    2)如果希望继续动态增加计算资源,可以输入下面命令增加Task Node。--add-instance-group TASK--instance-count INSTANCE_COUNT--instance-type INSTANCE_TYPE

    执行完步骤1)或者1)、2)后,集群将会处于图1-9所示的等待状态。图1-9 集群细节监控界面

    进入管理界面http:localhost:9091可以查看集群资源使用状况;进入

    http:localhost:8080可以观察Spark集群的状况。Lynx界面如图1-10所示。

    3)加载数据集。

    示例数据集使用Wiki文章数据,总量为4.5TB,有1万亿左右记录。Wiki文章数据存储在

    S3中,下载地址为s3:bigdata-spark-demowikistats。

    下面创建wikistats表,将数据加载进表:

    create external table wikistats

    (

    projectcode string,pagename string,pageviews int,pagesize int)

    ROW FORMAT

    DELIMITED FIELDS

    TERMINTED BY

    LOCATION 's3n:bigdata-spark-demowikistats';

    ALTER TABLE wikistats add partition(dt='2007-12')location 's3n:bigdata-spark-demowikistats20072007-

    12';......图1-10 Lynx界面

    4)分析数据。

    使用Shark获取2014年2月的Top 10页面。用户可以在Shark输入下面的SQL语句进行分

    析。

    Select pagename,sum(pageviews) c from wikistats_cached where dt='2014-01'

    group by pagename order by c desc limit 10;

    这个语句大致花费26s,扫描了250GB的数据。

    云计算带来资源的按需分配,用户可以采用云端的虚机作为大数据分析平台的底层基础

    设施,在上端构建Spark集群,进行大数据分析。随着处理数据量的增加,按需扩展分析节

    点,增加集群的数据分析能力。1.5.2 Spark在Yahoo!的应用

    在Spark技术的研究与应用方面,Yahoo!始终处于领先地位,它将Spark应用于公司的

    各种产品之中。移动App、网站、广告服务、图片服务等服务的后端实时处理框架均采用了

    Spark+Shark的架构。

    在2013年,Yahoo!拥有72656600个页面,有上百万的商品类别,上千个商品和用户

    特征,超过800万用户,每天需要处理海量数据。

    通过图1-11可以看到Yahoo!使用Spark进行数据分析的整体架构。

    图1-11 Yahoo!大数据分析栈

    大数据分析平台架构解析如下。

    整个数据分析栈构建在YARN之上,这是为了让Hadoop和Spark的任务共存。主要包含

    两个主要模块:

    1)离线处理模块:使用MapReduce和Spark+Shark混合架构。由于MapReduce适合进

    行ETL处理,还保留Hadoop进行数据清洗和转换。数据在ETL之后加载进HDFSHCatHive

    数据仓库存储,之后可以通过Spark、Shark进行OLAP数据分析。2)实时处理模块:使用Spark Streaming+Spark+Shark架构进行处理。实时流数据源

    源不断经过Spark Steaming初步处理和分析之后,将数据追加进关系数据库或者NoSQL数据

    库。之后,结合历史数据,使用Spark进行实时数据分析。

    之所以选择Spark,Yahoo!基于以下几点进行考虑。

    1)进行交互式SQL分析的应用需求。

    2)RAM和SSD价格不断下降,数据分析实时性的需求越来越多,大数据急需一个内存

    计算框架进行处理。

    3)程序员熟悉Scala开发,接受Spark学习曲线不陡峭。

    4)Spark的社区活跃度高,开源系统的Bug能够更快地解决。

    5)传统Hadoop生态系统的分析组件在进行复杂数据分析和保证实时性方面表现得力不

    从心。Spark的全栈支持多范式数据分析能够应对多种多样的数据分析需求。

    6)可以无缝将Spark集成进现有的Hadoop处理架构。

    Yahoo!的Spark集群在2013年已经达到9.2TB持久存储、192GB RAM、112节点(每节

    点为SATA 1×500GB(7200转的硬盘))、400GB SSD(1×400GB SATA 300MBs)的集群

    规模。1.5.3 Spark在西班牙电信的应用

    西班牙电信(Telefónica,S.A.)是西班牙的一家电信公司。这是全球第五大固网和移

    动通信运营商。

    Telefónica成立于1924年。在1997年电信市场自由化之前,Telefónica是西班牙唯一的

    电信运营商,至今仍占据主要的市场份额(2004年超过75%)。

    西班牙电信的数据与日俱增,随着数据的增长,网络安全成为一个不可忽视的问题而凸

    显。DDoS攻击、SQL注入攻击、网站置换、账号盗用等网络犯罪频繁发生。如何通过大数

    据分析,预防网络犯罪与正确检测诊断成为迫在眉睫的问题。

    传统的应对方案是,采用中心化的数据存储,收集事件、日志和警告信息,对数据分析

    预警,并对用户行为进行审计。但是随着犯罪多样化与数据分析技术越来越复杂,架构已经

    演变为中心架构服务化,并提供早期预警、离线报告、趋势预测、决策支持和可视化的大数

    据网络安全分析预警策略。

    西班牙电信采用Stratio公司提供的含有Spark的数据分析解决方案构建自身的网络安全

    数据分析栈,将使用的大数据系统缩减了一半,平台复杂性降低,同时处理性能成倍提升。

    整体架构如图1-12所示。

    在架构图中,最顶层通过Kafka不断收集事件、日志、预警等多数据源的信息,形成流

    数据,完成数据集成的功能。接下来Kafka将处理好的数据传输给Storm,Storm将数据混合

    与预处理。最后将数据存储进Cassandra、Mongo和HDFS进行持久化存储,使用Spark进行

    数据分析与预警。

    在数据收集阶段:数据源是多样化的,可能来自DNS日志、用户访问IP、社交媒体数

    据、政府公共数据源等。Kafka到数据源拉取不同数据维度数据。在数据预处理阶段:通过Storm进行数据预处理与规范化。在这个阶段为了能够实时预

    警,采用比Spark Streaming实时性更高的Storm进行处理。

    在数据批处理阶段:数据经过预处理阶段之后将存储到Cassandra中持久化。开发人员

    通过Cassandra进行一些简单的查询和数据报表分析。对于复杂的数据分析,需要使用Spark

    来完成。Spark+Cassandra的架构结合了两个系统的优势。Cassandra的二级索引能够加速

    查询处理。

    Spark对机器学习和图计算等复杂数据分析应对自如,二者组合能够应对常见和复杂的

    数据分析负载。图1-12 西班牙电信数据分析平台1.5.4 Spark在淘宝的应用

    数据挖掘算法有时候需要迭代,每次迭代时间非常长,这是淘宝选择一个更高性能计算

    框架Spark的原因。Spark编程范式更加简洁也是一大原因。另外,GraphX提供图计算的能

    力也是很重要的。

    1.Spark on YARN架构

    Spark的计算调度方式从Mesos到Standalone,即自建Spark计算集群。虽然Standalone

    方式性能与稳定性都得到了提升,但自建集群资源少,需要从云梯集群复制数据,不能满足

    数据挖掘与计算团队业务需求[1]。而Spark on YARN能让Spark计算模型在云梯YARN集群上

    运行,直接读取云梯上的数据,并充分享受云梯YARN集群丰富的计算资源。图1-13为Spark

    on YARN的架构。

    图1-13 Spark on YARN架构

    Spark on YARN架构解析如下。

    基于YARN的Spark作业首先由客户端生成作业信息,提交给ResourceManager,ResourceManager在某一NodeManager汇报时把AppMaster分配给

    NodeManager,NodeManager启动SparkAppMaster,SparkAppMaster启动后初始化作业,然后向ResourceManager申请资源,申请到相应资源后,SparkAppMaster通过RPC让

    NodeManager启动相应的SparkExecutor,SparkExecutor向SparkAppMaster汇报并完成相应

    的任务。此外,SparkClient会通过AppMaster获取作业运行状态。目前,淘宝数据挖掘与计

    算团队通过Spark on YARN已实现MLR、PageRank和JMeans算法,其中MLR已作为生产作业

    运行。

    2.协作系统

    1)Spark Streaming:淘宝在云梯构建基于Spark Streaming的实时流处理框架。Spark

    Streaming适合处理历史数据和实时数据混合的应用需求,能够显著提高流数据处理的吞吐

    量。其对交易数据、用户浏览数据等流数据进行处理和分析,能够更加精准、快速地发现问

    题和进行预测。

    2)GraphX[2]

    :淘宝将交易记录中的物品和人组成大规模图。使用GraphX对这个大图进

    行处理(上亿个节点,几十亿条边)。GraphX能够和现有的Spark平台无缝集成,减少多平

    台的开发代价。

    本节主要介绍了Spark在工业界的应用。Spark起源于学术界,发展于工业界,现在已经

    成为大数据分析不可或缺的计算框架。通过Amazon提供Spark云服务,可以看到Big Data

    on Cloud已经兴起。Yahoo!很早就开始使用Spark,将Spark用于自己的广告平台、商品交

    易数据分析和推荐系统等数据分析领域。同时Yahoo!也积极回馈社区,与社区形成良好的

    互动。Stratio公司为西班牙电信提供基于Spark+Cassandra+Storm架构的数据分析解决方

    案,实现流数据实时处理与离线数据分析兼顾,通过它们的案例可以看到多系统混合提供多

    数据计算范式分析平台是未来的一个趋势。最后介绍国内淘宝公司的Spark应用案例,淘宝

    是国内较早使用Spark的公司,通过Spark进行大规模机器学、图计算以及流数据分析,并积

    极参与社区,与社区形成良好互动,并乐于分享技术经验。希望读者通过企业案例能够全面了解Spark的广泛应用和适用场景。

    [1] 参见沈洪的《深入剖析阿里巴巴云梯YARN集群》,《程序员》,2013.12。

    [2] 参见文章:黄明,吴炜.快刀初试:Spark GraphX在淘宝的实践.程序员,2014.8。1.6 本章小结

    本章首先介绍了Spark分布式计算平台和BDAS。BDAS的核心框架Spark为用户提供了系

    统底层细节透明、编程接口简洁的分布式计算平台。Spark具有计算速度快、实时性高、容

    错性好等突出特点。基于Spark的应用已经逐步落地,尤其是在互联网领域,如淘宝、腾

    讯、网易等公司的发展已经成熟。同时电信、银行等传统行也开始逐步试水Spark并取得了

    较好效果。本章也对Spark的基本情况、架构、运行逻辑等进行了介绍。最后介绍了Spark在

    工业界的应用,读者可以看到Spark的蓬勃发展以及在大数据分析平台中所处的位置及重要

    性。

    读者通过本章可以初步认识和理解Spark,更为底层的细节将在后续章节详细阐述。

    相信读者已经想搭建自己的Spark集群环境一探究竟了,接下来将介绍Spark的安装与配

    置。第2章 Spark集群的安装与部署

    Spark的安装简便,用户可以在官网上下载到最新的软件包,网址为

    http:spark.apache.org。

    Spark最早是为了在Linux平台上使用而开发的,在生产环境中也是部署在Linux平台

    上,但是Spark在UNIX、Windwos和Mac OS X系统上也运行良好。不过,在Windows上运行

    Spark稍显复杂,必须先安装Cygwin以模拟Linux环境,才能安装Spark。

    由于Spark主要使用HDFS充当持久化层,所以完整地使用Spark需要预先安装Hadoop。

    下面介绍Spark集群的安装和部署。2.1 Spark的安装与部署

    Spark在生产环境中,主要部署在安装有Linux系统的集群中。在Linux系统中安装Spark

    需要预先安装JDK、Scala等所需的依赖。由于Spark是计算框架,所以需要预先在集群内有

    搭建好存储数据的持久化层,如HDFS、Hive、Cassandra等。最后用户就可以通过启动脚本

    运行应用了。2.1.1 在Linux集群上安装与配置Spark

    下面介绍如何在Linux集群上安装与配置Spark。

    1.安装JDK

    安装JDK大致分为下面4个步骤。

    1)用户可以在Oracle JDK的官网下载相应版本的JDK,本例以JDK 1.6为例,官网地址

    为http:www.oracle.comtechnetworkjavajavasedownloadsindex.html。

    2)下载后,在解压出的JDK的目录下执行bin文件。

    .jdk-6u38-ea-bin-b04-linux-amd64-31_oct_2012.bin

    3)配置环境变量,在etcprofile增加以下代码。

    JAVA_HOME=homechengxujdk1.6.0_38

    PATH=JAVA_HOMEbin:PATH

    CLASSPATH=.:JAVA_HOMEjrelibrt.jar:JAVA_HOMEjrelibdt.jar:JAVA_HOMEjrelibtools.jar

    export JAVA_HOME PATH CLASSPATH

    4)使profile文件更新生效。

    .etcprofile

    2.安装Scala

    Scala官网提供各个版本的Scala,用户需要根据Spark官方规定的Scala版本进行下载和

    安装。Scala官网地址为http:www.scala-lang.org。

    以Scala-2.10为例进行介绍。

    1)下载scala-2.10.4.tgz。2)在目录下解压:

    tar -xzvf scala-2.10.4.tgz

    3)配置环境变量,在etcprofile中添加下面的内容。

    export SCALA_HOME=homechengxuscala-2.10.4scala-2.10.4

    export PATH={SCALA_HOME}bin:PATH

    4)使profile文件更新生效。

    .etcprofile

    3.配置SSH免密码登录

    在集群管理和配置中有很多工具可以使用。例如,可以采用pssh等Linux工具在集群中

    分发与复制文件,用户也可以自己书写Shell、Python的脚本分发包。

    Spark的Master节点向Worker节点发命令需要通过ssh进行发送,用户不希望Master每发

    送一次命令就输入一次密码,因此需要实现Master无密码登录到所有Worker。

    Master作为客户端,要实现无密码公钥认证,连接到服务端Worker。需要在Master上

    生成一个密钥对,包括一个公钥和一个私钥,然后将公钥复制到Worker上。当Master通过

    ssh连接Woker时,Worker就会生成一个随机数并用Master的公钥对随机数进行加密,发送

    给Worker。Master收到加密数之后再用私钥进行解密,并将解密数回传给Worker,Worker

    确认解密数无误之后,允许Master进行连接。这就是一个公钥认证过程,其间不需要用户手

    工输入密码,主要过程是将Master节点公钥复制到Worker节点上。

    下面介绍如何配置Master与Worker之间的SSH免密码登录。

    1)在Master节点上,执行以下命令。ssh-keygen-trsa

    2)打印日志执行以下命令。

    Generating publicprivate rsa key pair.

    Enter file in which to save the key (root.sshid_rsa):

    回车,设置默认路径

    Enter passphrase (empty for no passphrase):

    回车,设置空密码

    Enter same passphrase again:

    Your identification has been saved in root.sshid_rsa.

    Your public key has been saved in root.sshid_rsa.pub.

    如果是root用户,则在root.ssh目录下生成一个私钥id_rsa和一个公钥id_rsa.pub。

    把Master上的id_rsa.pub文件追加到Worker的authorized_keys内,以

    172.20.14.144(Worker)节点为例。

    3)复制Master的id_rsa.pub文件。

    scp id_rsa.pub root@172.20.14.144:home

    可使用pssh对全部节点分发

    4)登录172.20.14.144(Worker节点),执行以下命令。

    cat homeid_rsa.pub >> root.sshauthorized_keys

    可使用pssh对全部节点分发

    其他的Worker执行同样的操作。

    注意:配置完毕,如果Master仍然不能访问Worker,可以修改Worker的

    authorized_keys文件的权限,命令为chmod 600 authorized_keys。

    4.安装Hadoop

    下面讲解Hadoop的安装过程和步骤。

    (1)下载hadoop-2.2.01)选取一个Hadoop镜像网址,下载Hadoop(官网地址为

    http:hadoop.apache.org)。

    wgethttp:www.trieuvan.comapachehadoopcommon

    hadoop-2.2.0hadoop-2.2.0.tar.gz

    2)解压tar包。

    sudo tar-vxzf hadoop-2.2.0.tar.gz -C usrlocal

    cd usrlocal

    sudo mv hadoop-2.2.0 hadoop

    sudo chown -R hduser:hadoop hadoop

    (2)配置Hadoop环境变量

    1)编辑profile文件。

    vi etcprofile

    2)在profile文件中增加以下内容。

    export JAVA_HOME=usrlibjvmjdk

    export HADOOP_INSTALL=usrlocalhadoop

    export PATH=PATH:HADOOP_INSTALLbin

    export PATH=PATH:HADOOP_INSTALLsbin

    export HADOOP_MAPRED_HOME=HADOOP_INSTALL

    export HADOOP_COMMON_HOME=HADOOP_INSTALL

    export HADOOP_HDFS_HOME=HADOOP_INSTALL

    export YARN_HOME=HADOOP_INSTALL

    通过如上配置就可以让系统找到JDK和Hadoop的安装路径。

    (3)编辑配置文件

    1)进入Hadoop所在目录usrlocalhadoopetchadoop。

    2)配置hadoop-env.sh文件。

    export JAVA_HOME=usrlibjvmjdk3)配置core-site.xml文件。

    

    这里的值指的是默认的HDFS路径

    

    fs.defaultFS

    hdfs:Master:9000

    缓冲区大小:io.file.buffer.size默认是4KB

    

    io.file.buffer.size

    131072

    临时文件夹路径

    

    hadoop.tmp.dir

    file:hometmp

    Abase for other

    temporary directories.

    

    hadoop.proxyuser.hduser.hosts

    

    

    hadoop.proxyuser.hduser.groups

    

    4)配置yarn-site.xml文件。

    

    

    yarn.nodemanager.aux-services

    mapreduce_shuffle

    

    yarn.nodemanager.aux-services.mapreduce.shuffle.class

    org.apache.hadoop.mapred.ShuffleHandler

    resourcemanager的地址

    

    yarn.resourcemanager.address

    Master:8032

    调度器的端口

    

    yarn.resourcemanager.scheduler.address

     Master1:8030

    resource-tracker端口

    

    yarn.resourcemanager.resource-tracker.address

     Master:8031

    resourcemanager管理器端口

    

    yarn.resourcemanager.admin.address

     Master:8033

    ResourceManager 的 Web 端口,监控 job 的资源调度

    

    yarn.resourcemanager.webapp.address

     Master:8088

    5)配置mapred-site.xml文件,加入如下内容。

    

    hadoop对map-reduce运行框架一共提供了3种实现,在mapred-site.xml中通过mapreduce.framework.name这个属性来设置

    为classic、yarn或者local

    

    mapreduce.framework.name

    yarn

    MapReduce JobHistory Server地址

    

    mapreduce.jobhistory.address

    Master:10020

    MapReduce JobHistory Server Web UI地址

    

    mapreduce.jobhistory.webapp.address

    Master:19888

    (4)创建namenode和datanode目录,并配置其相应路径

    1)创建namenode和datanode目录,执行以下命令。

    mkdir hdfsnamenode

    mkdir hdfsdatanode

    2)执行命令后,再次回到目录usrlocalhadoopetchadoop,配置hdfs-site.xml文

    件,在文件中添加如下内容。

    

    配置主节点名和端口号

    

    dfs.namenode.secondary.http-address

    Master:9001

    配置从节点名和端口号

    

    dfs.namenode.name.dir

    file:hdfsnamenode

    配置datanode的数据存储目录

    

    dfs.datanode.data.dir

    file:hdfsdatanode

    配置副本数

    

    dfs.replication

    3

    将dfs.webhdfs.enabled属性设置为true,否则就不能使用webhdfs的LISTSTATUS、LISTFILESTATUS等需要列出文件、文件夹状态的命令,因为这些信息都是由namenode保存的

    

    dfs.webhdfs.enabled

    true

    (5)配置Master和Slave文件

    1)Master文件负责配置主节点的主机名。例如,主节点名为Master,则需要在Master

    文件添加以下内容。

    Master Master为主节点主机名

    2)配置Slaves文件添加从节点主机名,这样主节点就可以通过配置文件找到从节点,和从节点进行通信。例如,以Slave1~Slave5为从节点的主机名,就需要在Slaves文件中添

    加如下信息。

    Slave为从节点主机名

    Slave1

    Slave2

    Slave3

    Slave4

    Slave5

    (6)将Hadoop的所有文件通过pssh分发到各个节点

    执行如下命令。

    .pssh -h hosts.txt -r hadoop

    (7)格式化Namenode(在Hadoop根目录下)

    .binhadoop namenode -format

    (8)启动Hadoop

    .sbinstart-all.sh(9)查看是否配置和启动成功

    如果在x86机器上运行,则通过jps命令,查看相应的JVM进程

    2584 DataNode

    2971 ResourceManager

    3462 Jps

    3179 NodeManager

    2369 NameNode

    2841 SecondaryNameNode

    注意,由于在IBM JVM中没有jps命令,所以需要用户按照下面命令逐个查看。

    ps-aux|grep DataNode 查看DataNode进程

    5.安装Spark

    进入官网下载对应Hadoop版本的Spark程序包(见图2-1),官网地址为

    http:spark.apache.orgdownloads.html。

    图2-1 Spark下载官网

    截止到笔者进行本书写作之时,Spark已经更新到1.0版本。以Spark1.0版本为例,介绍Spark的安装。

    1)下载spark-1.0.0-bin-hadoop2.tgz。

    2)解压tar-xzvf spark-1.0.0-bin-hadoop2.tgz。

    3)配置confspark-env.sh文件

    ①用户可以配置基本的参数,其他更复杂的参数请见官网的配置(Configuration)页

    面,Spark配置(Configuration)地址为:

    http:spark.apache.orgdocslatestconfiguration.html。

    ②编辑confspark-env.sh文件,加入下面的配置参数。

    export SCALA_HOME=pathtoscala-2.10.4

    export SPARK_WORKER_MEMORY=7g

    export SPARK_MASTER_IP=172.16.0.140

    export MASTER=spark:172.16.0.140:7077

    参数SPARK_WORKER_MEMORY决定在每一个Worker节点上可用的最大内存,增加这个

    数值可以在内存中缓存更多数据,但是一定要给Slave的操作系统和其他服务预留足够的内

    存。

    需要配置SPARK_MASTER_IP和MASTER,否则会造成Slave无法注册主机错误。

    4)配置slaves文件。

    编辑confslaves文件,以5个Worker节点为例,将节点的主机名加入slaves文件中。

    Slave1

    Slave2

    Slave3

    Slave4

    Slave5

    6.启动集群(1)Spark启动与关闭

    1)在Spark根目录启动Spark。

    .sbinstart-all.sh

    2)关闭Spark。

    .sbinstop-all.sh

    (2)Hadoop的启动与关闭

    1)在Hadoop根目录启动Hadoop。

    .sbinstart-all.sh

    2)关闭Hadoop。

    .sbinstop-all.sh

    (3)检测是否安装成功

    1)正常状态下的Master节点如下。

    -bash-4.1 jps

    23526 Jps

    2127 Master

    7396 NameNode

    7594 SecondaryNameNode

    7681 ResourceManager

    2)利用ssh登录Worker节点。

    -bash-4.1 ssh slave2

    -bash-4.1 jps

    1405 Worker

    1053 DataNode

    22455 Jps

    31935 NodeManager至此,在Linux集群上安装与配置Spark集群的步骤告一段落。2.1.2 在Windows上安装与配置Spark

    本节介绍在Windows系统上安装Spark的过程。在Windows环境下需要安装Cygwin模拟

    Linux的命令行环境来安装Spark。

    (1)安装JDK

    相对于Linux、Windows的JDK安装更加自动化,用户可以下载安装Oracle JDK或者

    OpenJDK。只安装JRE是不够的,用户应该下载整个JDK。

    安装过程十分简单,运行二进制可执行文件即可,程序会自动配置环境变量。

    (2)安装Cygwin

    Cygwin[1]

    是在Windows平台下模拟Linux环境的一个非常有用的工具,只有通过它才可

    以在Windows环境下安装Hadoop和Spark。具体安装步骤如下。

    1)运行安装程序,选择install from internet。

    2)选择网络最好的下载源进行下载。

    3)进入Select Packages界面(见图2-2),然后进入Net,选择openssl及openssh。因

    为之后还是会用到ssh无密钥登录的。图2-2 Cygwin安装选择界面

    另外应该安装“Editors Category”下面的“vim”。这样就可以在Cygwin上方便地修改配置

    文件。

    最后需要配置环境变量,依次选择“我的电脑”→“属性”→“高级系统设置”→“环境变

    量”命令,更新环境变量中的path设置,在其后添加Cygwin的bin目录和Cygwin的usr\bin两

    个目录。

    (3)安装sshd并配置免密码登录

    1)双击桌面上的Cygwin图标,启动Cygwin,执行ssh-host-config-y命令,出现如图2-3

    所示的界面。图2-3 Cygwin安装sshd选择界面

    2)执行后,提示输入密码,否则会退出该配置,此时输入密码和确认密码,按回车

    键。最后出现Host configuration finished.Have fun!表示安装成功。

    3)输入net start sshd,启动服务。或者在系统的服务中找到并启动Cygwin sshd服务。

    注意,如果是Windows 8操作系统,启动Cygwin时,需要以管理员身份运行(右击图标,选择以管理员身份运行),否则会因为权限问题,提示“发生系统错误5”。

    (4)配置SSH免密码登录

    1)执行ssh-keygen命令生成密钥文件,如图2-4所示。

    图2-4 Cygwin ssh生成密钥

    2)执行此命令后,在你的Cygwin\home\用户名路径下面会生成.ssh文件夹,可以通过

    命令ls-ahome用户名查看,通过ssh-version命令查看版本。

    3)执行完ssh-keygen命令后,再执行下面命令,生成authorized_keys文件。

    cd ~.ssh

    cp id_dsa.pub authorized_keys

    这样就配置好了sshd服务。

    (5)配置Hadoop修改和配置相关文件与Linux的配置一致,读者可以参照上文Linux中的配置方式,这里

    不再赘述。

    (6)配置Spark

    修改和配置相关文件与Linux的配置一致,读者可以参照上文Linux中的配置方式,这里

    不再赘述。

    (7)运行Spark

    1)Spark的启动与关闭

    ①在Spark根目录启动Spark。

    .sbinstart-all.sh

    ②关闭Spark。

    .sbinstop-all.sh

    2)Hadoop的启动与关闭

    ①在Hadoop根目录启动Hadoop。

    .sbinstart-all.sh

    ②关闭Hadoop。

    .sbinstop-all.sh

    3)检测是否安装成功

    正常状态下会出现如下内容。-bash-4.1 jps

    23526 Jps

    2127 Master

    7396 NameNode

    7594 SecondaryNameNode

    7681 ResourceManager

    1053 DataNode

    31935 NodeManager

    1405 Worker

    如缺少进程请到logs文件夹下查看相应日志,针对具体问题进行解决。

    [1] 可以通过官网进行下载:http:www.cygwin.com。2.2 Spark集群初试

    假设已经按照上述步骤配置完成Spark集群,可以通过两种方式运行Spark中的样例。下

    面以Spark项目中的SparkPi为例,可以用以下方式执行样例。

    1)以.run-example的方式执行

    用户可以按照下面的命令执行Spark样例。

    .binrun-example org.apache.spark.examples.SparkPi

    2)以.Spark Shell的方式执行

    Spark自带交互式的Shell程序,方便用户进行交互式编程。下面进入Spark Shell的交互

    式界面。

    .binspark-shell

    用户可以将下面的例子复制进Spark Shell中执行。

    importscala.math.random

    importorg.apache.spark._

    objectSparkPi {

    def main(args: Array[String]) {

    val slices = 2

    val n = 100000 slices

    val count = sc.parallelize(1 to n, slices).map { i =>

    val x = random 2 - 1

    val y = random 2 - 1

    if (xx + yy < 1) 1 else 0

    }.reduce(_ + _)

    println(Pi is roughly + 4.0 count n)

    }

    }

    按回车键执行上述命令。

    注意,Spark Shell中已经默认将SparkContext类初始化为对象sc。用户代码如果需要用

    到,则直接应用sc即可,否则用户自己再初始化,就会出现端口占用问题,相当于启动两个上下文。

    3)通过Web UI查看集群状态

    浏览器输入http:masterIP:8080,也可以观察到集群的整个状态是否正常,如图2-5所

    示。集群会显示与图2-5类似的画面。masterIP配置为用户的Spark集群的主节点IP。

    图2-5 Spark Web UI2.3 本章小结

    本章主要介绍了如何在Linux和Windows环境下安装部署Spark集群。

    由于Spark主要使用HDFS充当持久化层,所以完整地使用Spark需要预先安装Hadoop。

    通过本章介绍,读者就可以开启Spark的实战之旅了。

    下一章将介绍Spark的计算模型,Spark将分布式的内存数据抽象为弹性分布式数据集

    (RDD),并在其上实现了丰富的算子,从而对RDD进行计算,最后将算子序列转化为有向

    无环图进行执行和调度。第3章 Spark计算模型

    创新都是站在巨人的肩膀上产生的,在大数据领域也不例外。微软的Dryad使用DAG执

    行模式、子任务自由组合的范型。该范型虽稍显复杂,但较为灵活。Pig也针对大关系表的

    处理提出了很多有创意的处理方式,如flatten、cogroup。经典虽难以突破,但作为后继者

    的Spark借鉴经典范式并进行创新。经过实践检验,Spark的编程范型在处理大数据时显得简

    单有效。的数据处理与传输模式也大获全胜。

    Spark站在巨人的肩膀上,依靠Scala强有力的函数式编程、Actor通信模式、闭包、容

    器、泛型,借助统一资源分配调度框架Mesos,融合了MapReduce和Dryad,最后产生了一

    个简洁、直观、灵活、高效的大数据分布式处理框架。

    与Hadoop不同,Spark一开始就瞄准性能,将数据(包括部分中间数据)放在内存,在

    内存中计算。用户将重复利用的数据缓存到内存,提高下次的计算效率,因此Spark尤其适

    合迭代型和交互型任务。Spark需要大量的内存,但性能可随着机器数目呈多线性增长。本

    章将介绍Spark的计算模型。3.1 Spark程序模型

    下面通过一个经典的示例程序来初步了解Spark的计算模型,过程如下。

    1)SparkContext中的textFile函数从HDFS[1]

    读取日志文件,输出变量file[2]。

    val file=sc.textFile(hdfs:xxx)

    2)RDD中的filter函数过滤带“ERROR”的行,输出errors(errors也是一个RDD)。

    val errors=file.filter(line=>line.contains(ERROR)

    3)RDD的count函数返回“ERROR”的行数:errors.count。

    RDD操作起来与Scala集合类型没有太大差别,这就是Spark追求的目标:像编写单机程

    序一样编写分布式程序,但它们的数据和运行模型有很大的不同,用户需要具备更强的系统

    把控能力和分布式系统知识。

    从RDD的转换和存储角度看这个过程,如图3-1所示。图3-1 Spark程序模型

    在图3-1中,用户程序对RDD通过多个函数进行操作,将RDD进行转换。Block-Manager

    管理RDD的物理分区,每个Block就是节点上对应的一个数据块,可以存储在内存或者磁

    盘。而RDD中的partition是一个逻辑数据块,对应相应的物理块Block。本质上一个RDD在代

    码中相当于是数据的一个元数据结构,存储着数据分区及其逻辑结构映射关系,存储着RDD

    之前的依赖转换关系。

    [1] 也可以是本地文件或者其他的持久化层,如Hive等。

    [2] file是一个RDD,数据项是文件中的每行数据。3.2 弹性分布式数据集

    本节简单介绍RDD,并介绍RDD与分布式共享内存的异同。3.2.1 RDD简介

    在集群背后,有一个非常重要的分布式数据架构,即弹性分布式数据集(resilient

    distributed dataset,RDD),它是逻辑集中的实体,在集群中的多台机器上进行了数据分

    区。通过对多台机器上不同RDD分区的控制,就能够减少机器之间的数据重排(data

    shuffling)。Spark提供了“partitionBy”运算符,能够通过集群中多台机器之间对原始RDD进

    行数据再分配来创建一个新的RDD。RDD是Spark的核心数据结构,通过RDD的依赖关系形

    成Spark的调度顺序。通过对RDD的操作形成整个Spark程序。

    (1)RDD的两种创建方式

    1)从Hadoop文件系统(或与Hadoop兼容的其他持久化存储系统,如Hive、Cassandra、Hbase)输入(如HDFS)创建。

    2)从父RDD转换得到新的RDD。

    (2)RDD的两种操作算子

    对于RDD可以有两种计算操作算子:Transformation(变换)与Action(行动)。

    1)Transformation(变换)。

    Transformation操作是延迟计算的,也就是说从一个RDD转换生成另一个RDD的转换操

    作不是马上执行,需要等到有Actions操作时,才真正触发运算。

    2)Action(行动)

    Action算子会触发Spark提交作业(Job),并将数据输出到Spark系统。

    (3)RDD的重要内部属性1)分区列表。

    2)计算每个分片的函数。

    3)对父RDD的依赖列表。

    4)对Key-Value对数据类型RDD的分区器,控制分区策略和分区数。

    5)每个数据分区的地址列表(如HDFS上的数据块的地址)。3.2.2 RDD与分布式共享内存的异同

    RDD是一种分布式的内存抽象,表3-1列出了RDD与分布式共享内存(Distributed

    Shared Memory,DSM)的对比。在DSM系统[1]

    中,应用可以向全局地址空间的任意位置进

    行读写操作。DSM是一种通用的内存数据抽象,但这种通用性同时也使其在商用集群上实现

    有效的容错性和一致性更加困难。

    RDD与DSM主要区别在于[2]

    ,不仅可以通过批量转换创建(即“写”)RDD,还可以对任

    意内存位置读写。RDD限制应用执行批量写操作,这样有利于实现有效的容错。特别是,由

    于RDD可以使用Lineage(血统)来恢复分区,基本没有检查点开销。失效时只需要重新计

    算丢失的那些RDD分区,就可以在不同节点上并行执行,而不需要回滚(Roll Back)整个程

    序。

    表3-1 RDD与DSM的对比

    通过备份任务的复制,RDD还可以处理落后任务(即运行很慢的节点),这点与

    MapReduce类似,DSM则难以实现备份任务,因为任务及其副本均需读写同一个内存位置的

    数据。

    与DSM相比,RDD模型有两个优势。第一,对于RDD中的批量操作,运行时将根据数据

    存放的位置来调度任务,从而提高性能。第二,对于扫描类型操作,如果内存不足以缓存整

    个RDD,就进行部分缓存,将内存容纳不下的分区存储到磁盘上。

    另外,RDD支持粗粒度和细粒度的读操作。RDD上的很多函数操作(如count和collect

    等)都是批量读操作,即扫描整个数据集,可以将任务分配到距离数据最近的节点上。同时,RDD也支持细粒度操作,即在哈希或范围分区的RDD上执行关键字查找。

    后续将算子从两个维度结合在3.3节对RDD算子进行详细介绍。

    1)Transformations(变换)和Action(行动)算子维度。

    2)在Transformations算子中再将数据类型维度细分为:Value数据类型和Key-Value对

    数据类型的Transformations算子。Value型数据的算子封装在RDD类中可以直接使用,Key-

    Value对数据类型的算子封装于PairRDDFunctions类中,用户需要引入import

    org.apache.spark.SparkContext._才能够使用。进行这样的细分是由于不同的数据类型处理

    思想不太一样,同时有些算子是不同的。

    [1] 注意,这里的DSM,不仅指传统的共享内存系统,还包括那些通过分布式哈希表或分布

    式文件系统进行数据共享的系统,如Piccolo。

    [2] 参见论文:Resilient Distributed Datasets:A Fault-Tolerant Abstraction for In-Memory

    Cluster Computing。3.2.3 Spark的数据存储

    Spark数据存储的核心是弹性分布式数据集(RDD)。RDD可以被抽象地理解为一个大

    的数组(Array),但是这个数组是分布在集群上的。逻辑上RDD的每个分区叫一个

    Partition。

    在Spark的执行过程中,RDD经历一个个的Transfomation算子之后,最后通过Action算

    子进行触发操作。逻辑上每经历一次变换,就会将RDD转换为一个新的RDD,RDD之间通过

    Lineage产生依赖关系,这个关系在容错中有很重要的作用。变换的输入和输出都是RDD。

    RDD会被划分成很多的分区分布到集群的多个节点中。分区是个逻辑概念,变换前后的新旧

    分区在物理上可能是同一块内存存储。这是很重要的优化,以防止函数式数据不变性

    (immutable)导致的内存需求无限扩张。有些RDD是计算的中间结果,其分区并不一定有

    相应的内存或磁盘数据与之对应,如果要迭代使用数据,可以调cache函数缓存数据。

    图3-2为RDD的数据存储模型。

    图3-2 RDD数据管理模型图3-2中的RDD_1含有5个分区(p1、p2、p3、p4、p5),分别存储在4个节点

    (Node1、node2、Node3、Node4)中。RDD_2含有3个分区(p1、p2、p3),分布在3个

    节点(Node1、Node2、Node3)中。

    在物理上,RDD对象实质上是一个元数据结构,存储着Block、Node等的映射关系,以

    及其他的元数据信息。一个RDD就是一组分区,在物理数据存储上,RDD的每个分区对应的

    就是一个Block,Block可以存储在内存,当内存不够时可以存储到磁盘上。

    每个Block中存储着RDD所有数据项的一个子集,暴露给用户的可以是一个Block的迭代

    器(例如,用户可以通过mapPartitions获得分区迭代器进行操作),也可以就是一个数据项

    (例如,通过map函数对每个数据项并行计算)。本书会在后面章节具体介绍数据管理的底

    层实现细节。

    如果是从HDFS等外部存储作为输入数据源,数据按照HDFS中的数据分布策略进行数据

    分区,HDFS中的一个Block对应Spark的一个分区。同时Spark支持重分区,数据通过Spark

    默认的或者用户自定义的分区器决定数据块分布在哪些节点。例如,支持Hash分区(按照

    数据项的Key值取Hash值,Hash值相同的元素放入同一个分区之内)和Range分区(将属于

    同一数据范围的数据放入同一分区)等分区策略。

    下面具体介绍这些算子的功能。3.3 Spark算子分类及功能

    本节将主要介绍Spark算子的作用,以及算子的分类。

    1.Saprk算子的作用

    图3-3描述了Spark的输入、运行转换、输出。在运行转换中通过算子对RDD进行转换。

    算子是RDD中定义的函数,可以对RDD中的数据进行转换和操作。

    图3-3 Spark算子和数据空间

    1)输入:在Spark程序运行中,数据从外部数据空间(如分布式存储:textFile读取

    HDFS等,parallelize方法输入Scala集合或数据)输入Spark,数据进入Spark运行时数据空

    间,转化为Spark中的数据块,通过BlockManager进行管理。

    2)运行:在Spark数据输入形成RDD后便可以通过变换算子,如fliter等,对数据进行操

    作并将RDD转化为新的RDD,通过Action算子,触发Spark提交作业。如果数据需要复用,可以通过Cache算子,将数据缓存到内存。3)输出:程序运行结束数据会输出Spark运行时空间,存储到分布式存储中(如

    saveAsTextFile输出到HDFS),或Scala数据或集合中(collect输出到Scala集合,count返回

    Scala int型数据)。

    Spark的核心数据模型是RDD,但RDD是个抽象类,具体由各子类实现,如

    MappedRDD、ShuffledRDD等子类。Spark将常用的大数据操作都转化成为RDD的子类。

    2.算子的分类

    大致可以分为三大类算子。

    1)Value数据类型的Transformation算子,这种变换并不触发提交作业,针对处理的数

    据项是Value型的数据。

    2)Key-Value数据类型的Transfromation算子,这种变换并不触发提交作业,针对处理

    的数据项是Key-Value型的数据对。

    3)Action算子,这类算子会触发SparkContext提交Job作业。

    下面分别对这3类算子进行详细介绍。3.3.1 Value型Transformation算子

    处理数据类型为Value型的Transformation算子可以根据RDD变换算子的输入分区与输

    出分区关系分为以下几种类型。

    1)输入分区与输出分区一对一型。

    2)输入分区与输出分区多对一型。

    3)输入分区与输出分区多对多型。

    4)输出分区为输入分区子集型。

    5)还有一种特殊的输入与输出分区一对一的算子类型:Cache型。Cache算子对RDD分

    区进行缓存。

    1.输入分区与输出分区一对一型

    (1)map

    将原来RDD的每个数据项通过map中的用户自定义函数f映射转变为一个新的元素。源

    码中的map算子相当于初始化一个RDD,新RDD叫作MappedRDD(this,sc.clean(f))。图3-4 map算子对RDD转换

    图3-4中的每个方框表示一个RDD分区,左侧的分区经过用户自定义函数f:T->U映射为

    右侧的新的RDD分区。但是实际只有等到Action算子触发后,这个f函数才会和其他函数在一

    个Stage中对数据进行运算。V1输入f转换输出V’1。

    (2)flatMap

    将原来RDD中的每个元素通过函数f转换为新的元素,并将生成的RDD的每个集合中的

    元素合并为一个集合。内部创建FlatMappedRDD(this,sc.clean(f))。

    图3-5中小方框表示RDD的一个分区,对分区进行flatMap函数操作,flatMap中传入的函

    数为f:T->U,T和U可以是任意的数据类型。将分区中的数据通过用户自定义函数f转换为新

    的数据。外部大方框可以认为是一个RDD分区,小方框代表一个集合。V1、V2、V3在一个

    集合作为RDD的一个数据项,转换为V’1、V’2、V’3后,将结合拆散,形成为RDD中的数据

    项。

    图3-5 flapMap算子对RDD转换

    (3)mapPartitionsmapPartitions函数获取到每个分区的迭代器,在函数中通过这个分区整体的迭代器对整

    个分区的元素进行操作。内部实现是生成MapPartitionsRDD。图3-6中的方框代表一个RDD

    分区。

    图3-6中,用户通过函数f(iter)=>iter.filter(_>=3)对分区中的所有数据进行过

    滤,>=3的数据保留。一个方块代表一个RDD分区,含有1、2、3的分区过滤只剩下元素

    3。

    图3-6 mapPartitions算子对RDD转换

    (4)glom

    glom函数将每个分区形成一个数组,内部实现是返回的GlommedRDD。图3-7中的每个

    方框代表一个RDD分区。

    图3-7中的方框代表一个分区。该图表示含有V1、V2、V3的分区通过函数glom形成一个

    数组Array[(V1),(V2),(V3)]。图3-7 glom算子对RDD转换

    2.输入分区与输出分区多对一型

    (1)union

    使用union函数时需要保证两个RDD元素的数据类型相同,返回的RDD数据类型和被合

    并的RDD元素数据类型相同,并不进行去重操作,保存所有元素。如果想去重,可以使用

    distinct。++符号相当于uion函数操作。图3-8 union算子对RDD转换

    图3-8中左侧的大方框代表两个RDD,大方框内的小方框代表RDD的分区。右侧大方框

    代表合并后的RDD,大方框内的小方框代表分区。含有V1,V2…U4的RDD和含有V1,V8…

    U8的RDD合并所有元素形成一个RDD。V1、V1、V2、V8形成一个分区,其他元素同理进行

    合并。

    (2)cartesian

    对两个RDD内的所有元素进行笛卡尔积操作。操作后,内部实现返回CartesianRDD。

    图3-9中左侧的大方框代表两个RDD,大方框内的小方框代表RDD的分区。右侧大方框代表合并后的RDD,大方框内的小方框代表分区。

    图3-9 cartesian算子对RDD转换

    图3-9中的大方框代表RDD,大方框中的小方框代表RDD分区。例如,V1和另一个RDD

    中的W1、W2、Q5进行笛卡尔积运算形成(V1,W1)、(V1,W2)、(V1,Q5)。

    3.输入分区与输出分区多对多型

    groupBy:将元素通过函数生成相应的Key,数据就转化为Key-Value格式,之后将Key

    相同的元素分为一组。

    函数实现如下。

    ①sc.clean函数将用户函数预处理:val cleanF = sc.clean(f)

    ②对数据map进行函数操作,最后再对groupByKey进行分组操作。

    this.map(t => (cleanF(t), t)).groupByKey(p)

    其中,p中确定了分区个数和分区函数,也就决定了并行化的程度。图3-10中的方框代

    表RDD分区。

    图3-10中的方框代表一个RDD分区,相同key的元素合并到一个组。例如,V1,V2合并

    为一个Key-Value对,其中key为“V”,Value为“V1,V2”,形成V,Seq(V1,V2)。

    图3-10 groupBy算子对RDD转换

    4.输出分区为输入分区子集型

    (1)filter

    filter的功能是对元素进行过滤,对每个元素应用f函数,返回值为true的元素在RDD中保留,返回为false的将过滤掉。内部实现相当于生成FilteredRDD(this,sc.clean(f))。

    下面代码为函数的本质实现。

    def filter(f:T=>Boolean):RDD[T]=new FilteredRDD(this,sc.clean(f))

    图3-11中的每个方框代表一个RDD分区。T可以是任意的类型。通过用户自定义的过滤

    函数f,对每个数据项进行操作,将满足条件,返回结果为true的数据项保留。例如,过滤掉

    V2、V3保留了V1,将区分命名为V1'。

    图3-11 filter算子对RDD转换

    (2)distinct

    distinct将RDD中的元素进行去重操作。图3-12中的方框代表RDD分区。

    图3-12中的每个方框代表一个分区,通过distinct函数,将数据去重。例如,重复数据

    V1、V1去重后只保留一份V1。图3-12 distinct算子对RDD转换

    (3)subtract

    subtract相当于进行集合的差操作,RDD 1去除RDD 1和RDD 2交集中的所有元素。

    图3-13中左侧的大方框代表两个RDD,大方框内的小方框代表RDD的分区。右侧大方框

    代表合并后的RDD,大方框内的小方框代表分区。V1在两个RDD中均有,根据差集运算规

    则,新RDD不保留,V2在第一个RDD有,第二个RDD没有,则在新RDD元素中包含V2。图3-13 subtract算子对RDD转换

    (4)sample

    sample将RDD这个集合内的元素进行采样,获取所有元素的子集。用户可以设定是否

    有放回的抽样、百分比、随机种子,进而决定采样方式。

    内部实现是生成SampledRDD(withReplacement,fraction,seed)。

    函数参数设置如下。

    ·withReplacement=true,表示有放回的抽样;

    ·withReplacement=false,表示无放回的抽样。图3-14 sample算子对RDD转换

    图3-14中的每个方框是一个RDD分区。通过sample函数,采样50%的数据。V1、V2、U1、U2、U3、U4采样出数据V1和U1、U2,形成新的RDD。

    (5)takeSample

    takeSample函数和上面的sample函数是一个原理,但是不使用相对比例采样,而

    是按设定的采样个数进行采样,同时返回结果不再是RDD,而是相当于对采样后的数据进行

    Collect,返回结果的集合为单机的数组。

    图3-15中左侧的方框代表分布式的各个节点上的分区,右侧方框代表单机上返回的结果

    数组。通过takeSample对数据采样,设置为采样一份数据,返回结果为V1。

    5.Cache型

    (1)cache

    cache将RDD元素从磁盘缓存到内存,相当于persist(MEMORY_ONLY)函数的功能。

    图3-14中的方框代表RDD分区。

    图3-16中的每个方框代表一个RDD分区,左侧相当于数据分区都存储在磁盘,通过cache算子将数据缓存在内存。

    图3-15 takeSample算子对RDD转换

    图3-16 cache算子对RDD转换

    (2)persist

    persist函数对RDD进行缓存操作。数据缓存在哪里由StorageLevel枚举类型确定。有以

    下几种类型的组合(见图3-15),DISK代表磁盘,MEMORY代表内存,SER代表数据是否进

    行序列化存储。

    下面为函数定义,StorageLevel是枚举类型,代表存储模式,用户可以通过图3-17按需

    选择。persist(newLevel:StorageLevel)

    图3-17中列出persist函数可以缓存的模式。例如,MEMORY_AND_DISK_SER代表数据

    可以存储在内存和磁盘,并且以序列化的方式存储。其他同理。

    图3-17 persist算子对RDD转换

    图3-18中的方框代表RDD分区。disk代表存储在磁盘,mem代表存储在内存。数据最初

    全部存储在磁盘,通过persist(MEMORY_AND_DISK)将数据缓存到内存,但是有的分区无

    法容纳在内存,例如:图3-18中将含有V1,V2,V3的RDD存储到磁盘,将含有U1,U2的

    RDD仍旧存储在内存。

    图3-18 Persist算子对RDD转换3.3.2 Key-Value型Transformation算子

    Transformation处理的数据为Key-Value形式的算子,大致可以分为3种类型:输入分区

    与输出分区一对一、聚集、连接操作。

    1.输入分区与输出分区一对一

    mapValues:针对(Key,Value)型数据中的Value进行Map操作,而不对Key进行处

    理。

    图3-19中的方框代表RDD分区。a=>a+2代表只对(V1,1)数据中的1进行加2操作,返回结果为3。

    图3-19 mapValues算子RDD对转换

    2.对单个RDD或两个RDD聚集

    (1)单个RDD聚集

    1)combineByKey。

    定义combineByKey算子的代码如下。combineByKey[C](createCombiner:(V)=> C,mergeValue:(C, V)=> C,mergeCombiners:(C, C)=> C,partitioner: Partitioner

    mapSideCombine: Boolean = true,serializer: Serializer =null): RDD[(K, C)]

    说明:

    ·createCombiner:V=>C,在C不存在的情况下,如通过V创建seq C。

    ·mergeValue:(C,V)=>C,当C已经存在的情况下,需要merge,如把item V加到

    seq C中,或者叠加。

    ·mergeCombiners:(C,C)=>C,合并两个C。

    ·partitioner:Partitioner(分区器),Shuffle时需要通过Partitioner的分区策略进行分

    区。

    ·mapSideCombine:Boolean=true,为了减小传输量,很多combine可以在map端先

    做。例如,叠加可以先在一个partition中把所有相同的Key的Value叠加,再shuffle。

    ·serializerClass:String=null,传输需要序列化,用户可以自定义序列化类。

    例如,相当于将元素为(Int,Int)的RDD转变为了(Int,Seq[Int])类型元素的

    RDD。

    图3-20中的方框代表RDD分区。通过combineByKey,将(V1,2)、(V1,1)数据合

    并为(V1,Seq(2,1))。图3-20 comBineByKey算子对RDD转换

    2)reduceByKey。

    reduceByKey是更简单的一种情况,只是两个值合并成一个值,所以createCombiner很

    简单,就是直接返回v,而mergeValue和mergeCombiners的逻辑相同,没有区别。

    函数实现代码如下。

    def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = {

    combineByKey[V]((v: V) => v, func, func, partitioner)

    }

    图3-21中的方框代表RDD分区。通过用户自定义函数(A,B)=>(A+B),将相同

    Key的数据(V1,2)、(V1,1)的value相加,结果为(V1,3)。图3-21 reduceByKey算子对RDD转换

    3)partitionBy。

    partitionBy函数对RDD进行分区操作。

    函数定义如下。

    partitionBy(partitioner:Partitioner)

    如果原有RDD的分区器和现有分区器(partitioner)一致,则不重分区,如果不一致,则相当于根据分区器生成一个新的ShuffledRDD。

    图3-22中的方框代表RDD分区。通过新的分区策略将原来在不同分区的V1、V2数据都

    合并到了一个分区。图3-22 partitionBy算子对RDD转换

    (2)对两个RDD进行聚集

    cogroup函数将两个RDD进行协同划分,cogroup函数的定义如下。

    cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))]

    对在两个RDD中的Key-Value类型的元素,每个RDD相同Key的元素分别聚合为一个集

    合,并且返回两个RDD中对应Key的元素集合的迭代器。

    (K, (Iterable[V], Iterable[W]))

    其中,Key和Value,Value是两个RDD下相同Key的两个数据集合的迭代器所构成的元

    组。

    图3-23中的大方框代表RDD,大方框内的小方框代表RDD中的分区。将RDD1中的数据

    (U1,1)、(U1,2)和RDD2中的数据(U1,2)合并为(U1,((1,2),(2)))。图3-23 Cogroup算子对RDD转换

    3.连接

    (1)join

    join对两个需要连接的RDD进行cogroup函数操作,cogroup原理请见上文。cogroup操

    作之后形成的新RDD,对每个key下的元素进行笛卡尔积操作,返回的结果再展平,对应

    Key下的所有元组形成一个集合,最后返回RDD[(K,(V,W))]

    下面代码为join的函数实现,本质是通过cogroup算子先进行协同划分,再通过

    flatMapValues将合并的数据打散。

    this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>

    for (v <- vs; w <- ws) yield (v, w) }

    图3-24是对两个RDD的join操作示意图。大方框代表RDD,小方框代表RDD中的分区。

    函数对拥有相同Key的元素(例如V1)为Key,以做连接后的数据结果为(V1,(1,1))

    和(V1,(1,2))。图3-24 join算子对RDD转换

    (2)leftOutJoin和rightOutJoin

    LeftOutJoin(左外连接)和RightOutJoin(右外连接)相当于在join的基础上先判断一

    侧的RDD元素是否为空,如果为空,则填充为空。如果不为空,则将数据进行连接运算,并

    返回结果。

    下面代码是leftOutJoin的实现。

    if (ws.isEmpty) {

    vs.map(v => (v, None))

    } else {

    for (v <- vs; w <- ws) yield (v, Some(w))

    }3.3.3 Actions算子

    本质上在Actions算子中通过SparkContext执行提交作业的runJob操作,触发了RDD

    DAG的执行。

    例如,Actions算子collect函数的代码如下,感兴趣的读者可以顺着这个入口进行源码剖

    析。

    返回这个RDD的所有数据,结果以数组形式存储

    def collect: Array[T] = {

    提交Job

    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)

    Array.concat(results: _)

    }

    下面根据Action算子的输出空间将Action算子进行分类:无输出、HDFS、Scala集合和

    数据类型。

    1.无输出

    (1)foreach

    对RDD中的每个元素都应用f函数操作,不返回RDD和Array,而是返回Uint。

    图3-25表示foreach算子通过用户自定义函数对每个数据项进行操作。本例中自定义函

    数为println,控制台打印所有数据项。

    2.HDFS

    (1)saveAsTextFile

    函数将数据输出,存储到HDFS的指定目录。

    下面为函数的内部实现。this.map(x => (NullWritable.get, new Text(x.toString)))

    .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)

    将RDD中的每个元素映射转变为(Null,x.toString),然后再将其写入HDFS。

    图3-26中左侧的方框代表RDD分区,右侧方框代表HDFS的Block。通过函数将RDD的每

    个分区存储为HDFS中的一个Block。

    图3-25 foreach算子对RDD转换

    图3-26 saveAsHadoopFile算子对RDD转换

    (2)saveAsObjectFile

    saveAsObjectFile将分区中的每10个元素组成一个Array,然后将这个Array序列化,映

    射为(Null,BytesWritable(Y))的元素,写入HDFS为SequenceFile的格式。下面代码为函数内部实现。

    map(x=>(NullWritable.get,new BytesWritable(Utils.serialize(x))))

    图3-27中的左侧方框代表RDD分区,右侧方框代表HDFS的Block。通过函数将RDD的每

    个分区存储为HDFS上的一个Block。

    图3-27 saveAsObjectFile算子对RDD转换

    3.Scala集合和数据类型(1)collect

    collect相当于toArray,toArray已经过时不推荐使用,collect将分布式的RDD返回为一

    个单机的scala Array数组。在这个数组上运用scala的函数式操作。

    图3-28中的左侧方框代表RDD分区,右侧方框代表单机内存中的数组。通过函数操作,将结果返回到Driver程序所在的节点,以数组形式存储。

    (2)collectAsMap

    collectAsMap对(K,V)型的RDD数据返回一个单机HashMap。对于重复K的RDD元

    素,后面的元素覆盖前面的元素。

    图3-29中的左侧方框代表RDD分区,右侧方框代表单机数组。数据通过collectAsMap函

    数返回给Driver程序计算结果,结果以HashMap形式存储。

    图3-28 Collect算子对RDD转换图3-29 collectAsMap算子对RDD转换

    (3)reduceByKeyLocally

    实现的是先reduce再collectAsMap的功能,先对RDD的整体进行reduce操作,然后再收

    集所有结果返回为一个HashMap。

    (4)lookup

    下面代码为lookup的声明。

    lookup(key:K):Seq[V]

    Lookup函数对(Key,Value)型的RDD操作,返回指定Key对应的元素形成的Seq。这

    个函数处理优化的部分在于,如果这个RDD包含分区器,则只会对应处理K所在的分区,然

    后返回由(K,V)形成的Seq。如果RDD不包含分区器,则需要对全RDD元素进行暴力扫描

    处理,搜索指定K对应的元素。

    图3-30中的左侧方框代表RDD分区,右侧方框代表Seq,最后结果返回到Driver所在节

    点的应用中。

    (5)countcount返回整个RDD的元素个数。内部函数实现如下。

    Def count:Long=sc.runJob(this,Utils.getIteratorSize_).sum

    在图3-31中,返回数据的个数为5。一个方块代表一个RDD分区。

    图3-30 lookup对RDD转换

    图3-31 count对RDD转换

    (6)top

    top可返回最大的k个元素。函数定义如下。

    top(num:Int)(implicit ord:Ordering[T]):Array[T]相近函数说明如下。

    ·top返回最大的k个元素。

    ·take返回最小的k个元素。

    ·takeOrdered返回最小的k个元素,并且在返回的数组中保持元素的顺序。

    ·first相当于top(1)返回整个RDD中的前k个元素,可以定义排序的方式Ordering[T]。

    返回的是一个含前k个元素的数组。

    (7)reduce

    reduce函数相当于对RDD中的元素进行reduceLeft函数的操作。函数实现如下。

    Some(iter.reduceLeft(cleanF))

    reduceLeft先对两个元素进行reduce函数操作,然后将结果和迭代器取出的下

    一个元素进行reduce函数操作,直到迭代器遍历完所有元素,得到最后结果。

    在RDD中,先对每个分区中的所有元素的集合分别进行reduceLeft。每个分区

    形成的结果相当于一个元素,再对这个结果集合进行reduceleft操作。

    例如:用户自定义函数如下。

    f:(A,B)=>(A._1+@+B._1,A._2+B._2)

    图3-32中的方框代表一个RDD分区,通过用户自定函数f将数据进行reduce运算。示例

    最后的返回结果为V1@[1]

    V2U!@U2@U3@U4,12。图3-32 reduce算子对RDD转换

    (8)fold

    fold和reduce的原理相同,但是与reduce不同,相当于每个reduce时,迭代器取的第一

    个元素是zeroValue。

    图3-33中通过下面的用户自定义函数进行fold运算,图中的一个方框代表一个RDD分

    区。读者可以参照(7)reduce函数理解。

    fold((V0@,2))( (A,B)=>(A._1+@+B._1,A._2+B._2))

    图3-33 fold算子对RDD转换(9)aggregate

    aggregate先对每个分区的所有元素进行aggregate操作,再对分区的结果进行fold操

    作。

    aggreagate与fold和reduce的不同之处在于,aggregate相当于采用归并的方式进行数

    据聚集,这种聚集是并行化的。而在fold和reduce函数的运算过程中,每个分区中需要进行

    串行处理,每个分区串行计算完结果,结果再按之前的方式进行聚集,并返回最终聚集结

    果。

    函数的定义如下。

    aggregate[B](z: B)(seqop: (B,A) => B,combop: (B,B) => B): B

    图3-34通过用户自定义函数对RDD 进行aggregate的聚集操作,图中的每个方框代表

    一个RDD分区。

    rdd.aggregate(V0@,2)((A,B)=>(A._1+@+B._1,A._2+B._2)),(A,B)=>(A._1+@+B_1,A._@+B_.2))

    最后,介绍两个计算模型中的两个特殊变量。

    广播(broadcast)变量:其广泛用于广播Map Side Join中的小表,以及广播大变量等

    场景。这些数据集合在单节点内存能够容纳,不需要像RDD那样在节点之间打散存储。

    Spark运行时把广播变量数据发到各个节点,并保存下来,后续计算可以复用。相比Hadoop

    的distributed cache,广播的内容可以跨作业共享。Broadcast的底层实现采用了BT机制。

    有兴趣的读者可以参考论文[2]。图3-34 aggregate算子对RDD转换

    ②代表V。

    ③代表U。

    accumulator变量:允许做全局累加操作,如accumulator变量广泛使用在应用中记录当

    前的运行指标的情景。

    [1] @代表数据的分隔符,可替换为其他分隔符。

    [2] 参见:Mosharaf Chowdhury,Performance and Scalability of Broadcast in Spark。3.4 本章小结

    本章主要介绍了Spark的计算模型,Spark将应用程序整体翻译为一个有向无环图进行调

    度和执行。相比MapReduce,Spark提供了更加优化和复杂的执行流。

    读者还可以深入了解Spark的运行机制与Spark算子,这样能更加直观地了解API的使

    用。Spark提供了更加丰富的函数式算子,这样就为Spark上层组件的开发奠定了坚实的基

    础。

    通过阅读本章,读者可以对Spark计算模型进行更为宏观的把握。相信读者还想对Spark

    内部执行机制进行更深入的了解,下面章节就对Spark的内核进行更深入的剖析。第4章 Spark工作机制详解

    通过前一章的介绍,读者对Spark的计算模型有了全面的把握,本章将深入介绍Spark的

    内部运行机制。Spark的主要模块包括调度与任务分配、IO模块、通信控制模块、容错模块

    以及Shuffle模块。Spark按照应用、作业、Stage和Task几个层次分别进行调度,采用了经

    典的FIFO和FAIR等调度算法。在Spark的IO中,将数据以块为单位进行管理,需要处理的块

    可以存储在本机内存、磁盘或者集群中的其他机器中。集群中的通信对于命令和状态的传递

    极为重要,Spark通过AKKA框架进行集群消息通信。分布式系统中的容错十分重要,Spark

    通过Lineage(血统)和Checkpoint机制进行容错性保证。最后介绍Spark中的Shuffle机制,虽然Spark也借鉴了MapReduce模型,但其对Shuffle机制进行了创新与优化。下面开始Spark

    内部运行机制的探索。4.1 Spark应用执行机制

    下面介绍Spark的应用执行机制。4.1.1 Spark执行机制总览

    Spark应用提交后经历了一系列的转换,最后成为Task在每个节点上执行。Spark应用转

    换(见图4-1):RDD的Action算子触发Job的提交,提交到Spark中的Job生成RDD DAG,由

    DAGScheduler转化为Stage DAG,每个Stage中产生相应的Task集合,TaskScheduler将任

    务分发到Executor执行。每个任务对应相应的一个数据块,使用用户定义的函数处理数据

    块。

    图4-1 Spark应用转换流程

    Spark执行的底层实现原理,如图4-2所示。在Spark的底层实现中,通过RDD进行数据

    的管理,RDD中有一组分布在不同节点的数据块,当Spark的应用在对这个RDD进行操作

    时,调度器将包含操作的任务分发到指定的机器上执行,在计算节点通过多线程的方式执行

    任务。一个操作执行完毕,RDD便转换为另一个RDD,这样,用户的操作依次执行。Spark为了系统的内存不至于快速用完,使用延迟执行的方式执行,即只有操作累计到Action(行

    动),算子才会触发整个操作序列的执行,中间结果不会单独再重新分配内存,而是在同一

    个数据块上进行流水线操作。

    在集群的程序实现上,有一个重要的分布式数据结构,即弹性分布式数据集(Resilient

    Distributed Dataset,RDD)。Spark实现了分布式计算和任务处理,并实现了任务的分

    发、跟踪、执行等工作,最终聚合结果,完成Spark应用的计算。

    对RDD的块管理通过BlockManger完成,BlockManager将数据抽象为数据块,在内存或

    者磁盘进行存储,如果数据不在本节点,则还可以通过远端节点复制到本机进行计算。

    图4-2 Spark执行底层实现

    在计算节点的执行器Executor中会创建线程池,这个执行器将需要执行的任务通过线程

    池并发执行。4.1.2 Spark应用的概念

    Spark应用(Application)是用户提交的应用程序。执行模式有Local、Standalone、YARN、Mesos。根据Spark Application的Driver Program是否在集群中运行,Spark应用的运

    行方式又可以分为Cluster模式和Client模式。图4-3为Application包含的组件。

    应用的基本组件如下。

    ·Application:用户自定义的Spark程序,用户提交后,Spark为App分配资源,将程序转

    换并执行。

    ·Driver Program:运行Application的main函数并创建SparkContext。

    ·RDD Graph:RDD是Spark的核心结构,可以通过一系列算子进行操作(主要有

    Transformation和Action操作)。当RDD遇到Action算子时,将之前的所有算子形成一个有

    向无环图(DAG),也就是图中的RDD Graph。再在Spark中转化为Job,提交到集群执行。

    一个App中可以包含多个Job。

    ·Job:一个RDD Graph触发的作业,往往由Spark Action算子触发,在SparkContext中

    通过runJob方法向Spark提交Job。图4-3 Spark Application基本组件

    ·Stage:每个Job会根据RDD的宽依赖关系被切分很多Stage,每个Stage中包含一组相

    同的Task,这一组Task也叫TaskSet。

    ·Task:一个分区对应一个Task,Task执行RDD中对应Stage中包含的算子。Task被封

    装好后放入Executor的线程池中执行。4.1.3 应用提交与执行方式

    应用的提交包含以下两种方式。

    ·Driver进程运行在客户端,对应用进行管理监控。

    ·主节点指定某个Worker节点启动Driver,负责整个应用的监控。

    Driver进程是应用的主控进程,负责应用的解析、切分Stage并调度Task到Executor执

    行,包含DAGScheduler等重要对象。下面具体介绍这两种方式的原理。

    1.Driver在客户端运行

    例如,执行Spark自带的样例程序:.binrun-example

    org.apache.spark.examples.SparkTC spark:UserHostIP:port。

    应用执行和控制如图4-4所示。

    图4-4 Spark Driver位于Client作业执行流程描述如下。

    用户启动客户端,之后客户端运行用户程序,启动Driver进程。在Driver中启动或实例

    化DAGScheduler等组件。客户端的Driver向Master注册。

    Worker向Master注册,Master命令Worker启动Exeuctor。Worker通过创建

    ExecutorRunner线程,在ExecutorRunner线程内部启动ExecutorBackend进程。

    ExecutorBackend启动后,向客户端Driver进程内的SchedulerBackend注册,这样Driver

    进程就能找到计算资源。Driver的DAGScheduler解析应用中的RDD DAG并生成相应的

    Stage,每个Stage包含的TaskSet通过TaskScheduler分配给Executor。在Executor内部启动

    线程池并行化执行Task。

    2.Driver在Worker运行

    如果Driver在Worker启动执行需要通过org.apache.spark.deploy.Client类执行应用,命

    令如下。

    .binspark-class org.apache.spark.deploy.Client launch spark:UserHostIP:port

    file:your_jar org.apache.spark.examples.SparkTC spark:UserHostIP:port

    应用提交与执行机制如图4-5所示。

    应用执行流程如下。

    1)用户启动客户端,客户端提交应用程序给Master。

    2)Master调度应用,针对每个应用分发给指定的一个Worker启动Driver,即Scheduler-

    Backend。Worker接收到Master命令后创建DriverRunner线程,在DriverRunner线程内创建

    SchedulerBackend进程。Driver充当整个作业的主控进程。Master会指定其他Worker启动

    Exeuctor,即ExecutorBackend进程,提供计算资源。流程和上面很相似,Worker创建ExecutorRunner线程,ExecutorRunner会启动ExecutorBackend进程。

    图4-5 Spark Driver位于Worker节点的应用提交与执行机制

    3)ExecutorBackend启动后,向Driver的SchedulerBackend注册,这样Driver获取了计

    算资源就可以调度和将任务分发到计算节点执行。SchedulerBackend进程中包含

    DAGScheduler,它会根据RDD的DAG切分Stage,生成TaskSet,并调度和分发Task到

    Executor。对于每个Stage的TaskSet,都会被存放到TaskScheduler中。TaskScheduler将任

    务分发到Executor,执行多线程并行任务。4.2 Spark调度与任务分配模块

    系统设计很重要的一环便是资源调度。设计者将资源进行不同粒度的抽象建模,然后将

    资源统一放入调度器,通过一定的算法进行调度,最终达到高吞吐量或者低访问延迟的目

    的。Spark的调度器设计精良,扩展性极好,为它的后续发展奠定了很好的基础。

    Spark有多种运行模式,如Local模式、Standalone模式、YARN模式、Mesos模式。在集

    群环境下,为了减少复杂性,抓住系统主要脉络进行理解。本节主要介绍Standalone模式中

    的名词,其他运行模式中各角色实现的功能基本一致,只不过是在特定资源管理器下使用略

    为不同的名称和调度机制。

    在Standalone模式下,集群启动之后,使用jps命令在主节点会看到Master进程,在从

    节点会看到Worker进程。其中,Master负责接收客户端提交的作业,管理Worker。提供了

    Web UI呈现集群运行时状态信息,方便用户诊断性能问题。

    在Spark的应用提交之后,Spark调度应用。系统设计的一个核心就是调度。从Spark整

    体上看,调度可以分为4个级别,Application调度、Job调度、Stage的调度、Task的调度与

    分发。上节已经介绍了这4个概念和概念之间的对应关系。下面对这4个层级调度进行介绍。4.2.1 Spark应用程序之间的调度

    通过前面的介绍,读者了解到每个应用拥有对应的SparkContext.SparkContext维持整

    个应用的上下文信息,提供一些核心方法,如runJob可以提交Job。然后,通过主节点的分

    配获得独立的一组Executor JVM进程执行任务。Executor空间内的不同应用之间是不共享

    的,一个Executor在一个时间段内只能分配给一个应用使用。如果多用户需要共享集群资

    源,依据集群管理者的配置,用户可以通过不同的配置选项来分配管理资源。

    对集群管理者来说简单的配置方式就是静态配置资源分配规则。例如,在不同的运行模

    式下,用户可以通过配置文件中进行集群调度的配置。配置每个应用可以使用的最大资源总

    量、调度的优先级等。

    1.调度配置

    下面根据不同集群的运行模式配置调度[1]。

    (1)Standalone

    默认情况下,用户向以Standalone模式运行的Spark集群提交的应用使用FIFO(先进先

    出)的顺序进行调度。每个应用会独占所有可用节点的资源。用户可以通过配置参数

    spark.cores.max决定一个应用可以在整个集群申请的CPU core数。注意,这个参数不是控

    制单节点可用多少核。如果用户没有配置这个参数,则在Standalone模式下,默认每个应用

    可以分配由参数spark.deploy.defaultCores决定的可用核数。

    (2)Mesos

    如果用户在Mesos上使用Spark,并且想要静态地配置资源的分配策略,则可以通过配

    置参数spark.mesos.coarse为true,将Mesos配置为粗粒度调度模式。然后配置参数

    spark.cores.max来限制应用可以使用的CPU core的最大限额。同时用户应该对参数spark.executor.memory进行配置,进而限制每个Executor的内存使用量。Mesos中还可以配

    置动态共享CPU core的执行模式,用户只需要使用mesos:URL而不配置

    spark.mesos.coarse参数为true,就能以这种方式执行,使Mesos运行在细粒度调度模型

    下。在这种模式下,每个Spark应用程序还是会拥有独立和固定的内存分配,但是当应用占

    用的一些机器上不再运行任务,机器处于空闲状态时,其他机器可以使用这些机器上空闲的

    CPU core来执行任务,相当于复用空闲的CPU提升了资源利用率。这种模式在集群上再运行

    大量不活跃的应用情景下十分有用,如大量不同用户发起请求的场景。

    (3)YARN

    当Spark运行在YARN平台上时,用户可以在YARN的客户端通过配置--num-executors选

    项控制为这个应用分配多少个Executor,然后通过配置--executor-memory及--executor-

    cores来控制应用被分到的每个Executor的内存大小和Executor所占用的CPU核数。这样便可

    以限制用户提交的应用不会过多的占用资源,让不同用户能够共享整个集群资源,提升

    YARN吞吐量。

    注意:以上3种运行模式都不提供跨应用的共享内存。如果用户想共享内存数据,Spark

    官网推荐用户开发一个单机服务,这个服务可以接收多个对同一个RDD的查询请求,并返回

    结果,类似的Shark JDBC Server就是这样工作的,Spark SQL在新版本中也会提供这样的功

    能。目前版本,Spark SQL暂时使用的Shark Server2 Github地址为

    https:github.comamplabsharktreesparkSql。

    Shark的Github地址为:https:github.comamplabshark。

    2.FIFO的调度代码

    最后,读者可以参考下面源码,了解在Standalone模式中,集群是如何完成应用FIFO的

    调度的。Spark的应用接收提交和调度的代码在Master.scala文件中,在schedule方法中

    实现调度。private def schedule {……

    for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {

    for (driver <- List(waitingDrivers: _)) {

    if (worker.memoryFree >= driver.desc.mem worker.coresFree >= driver.desc.cores) {

    launchDriver(worker, driver)

    waitingDrivers -= driver

    }

    }

    }

    从源码中可以看到,Master先统计可用资源,然后在waitingDrivers的队列中通过FIFO

    方式为App分配资源和指定Worker启动Driver执行应用。

    [1] 参见:http:spark.apache.orgdocslatestjob-scheduling.htmlscheduling-across-

    applications。4.2.2 Spark应用程序内Job的调度

    在Spark应用程序内部,用户通过不同线程提交的Job可以并行运行,这里所说的Job就

    是Spark Action(如count、collect等)算子触发的整个RDD DAG为一个Job,在实现上,算

    子中的本质是调用SparkContext中的runJob提交了Job。例如,通过count的源码看这个过

    程。

    def count: Long = sc.runJob(this, Utils.getIteratorSize _).sum

    其中,sc就是SparkContext对象,调用runJob方法提交Job。

    Spark的调度器是完全线程安全的,并且支持一个应用处理多请求的用例(如多用户进

    行查询)。

    (1)FIFO模式

    在默认情况下,Spark的调度器以FIFO(先进先出)方式调度Job的执行,如图4-6所

    示。每个Job被切分为多个Stage。第一个Job优先获取所有可用的资源,接下来第二个Job再

    获取剩余资源。以此类推,如果第一个Job并没有占用所有的资源,则第二个Job还可以继续

    获取剩余资源,这样多个Job可以并行运行。如果第一个Job很大,占用所有资源,则第二个

    Job就需要等待第一个任务执行完,释放空余资源,再申请和分配Job。图4-6 FIFO模式调度示意图

    读者可以通过图示大致了解FIFO模式,下面通过源码更加深入地剖析FIFO模式。

    private[spark] class FIFOSchedulingAlgorithm extends

    SchedulingAlgorithm {

    override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {

    执行优先级

    val priority1 = s1.priority

    val priority2 = s2.priority

    var res = math.signum(priority1 - priority2)

    if (res == 0) {

    val stageId1 = s1.stageId

    val stageId2 = s2.stageId

    signum是符号函数。功能是如果参数为 0,则返回 0;如果参数大于 0,则返回 1.0;如果参数小于 0,则返回-1.0

    res = math.signum(stageId1 - stageId2)

    }

    if (res < 0) {

    true

    } else {

    false

    }

    }

    }

    在算法执行中,先看优先级,TaskSet的优先级是JobID,因为先提交的JobID小,所以

    就会被更优先地调度,这里相当于进行了两层排序,先看是否是同一个Job的Taskset,不同

    Job之间的TaskSet先排序。

    最后执行的stageId最小为0,最先应该执行的stageId最大。但是这里的调度机制是优

    先调度Stageid小的。在DAGScheduler中控制Stage是否被提交到队列中,如果还有父母

    Stage未执行完,则该stage的Taskset不会提交到调度池中,这就保证了虽然最先做的stage

    的id大,但是排序完,由于后面的还没提交到调度池中,所以会先执行。由此可见,stage的TaskSet调度逻辑主要在DAGScheduler中,而Job调度由FIFO或者FAIR算法调度。

    Job调度的FIFO或FAIR方式是通过Pool类实现的。在下面代码为Pool类的实现、代码通

    过taskSetSchedulingAlgorithm选择使用FIFO还是FAIR进行Job调度。

    private[spark] class Pool(……

    var taskSetSchedulingAlgorithm: SchedulingAlgorithm = {

    schedulingMode match {

    case SchedulingMode.FAIR =>

    new FairSchedulingAlgorithm

    case SchedulingMode.FIFO =>

    new FIFOSchedulingAlgorithm

    }

    }……

    这里是使用调度算法的地方,实际上通过调度算法进行了Job的调度和Job内的

    TaskSetManager的两级调度。获取优先级最高的可调度资源执行

    override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = {

    var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]

    val sortedSchedulableQueue =

    使用比较器进行排序

    schedulableQueue.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator)

    for (schedulable <- sortedSchedulableQueue) {

    sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue

    }

    sortedTaskSetQueue

    }

    这里是使用调度算法的地方,实际上通过调度算法进行了Job的调度和Job内的

    TaskSetManager的两级调度。获取优先级最高的可调度资源执行。这里使用了设计模式中

    的策略模式,使用FIFO充当TaskSetManager的比较器。

    (2)FAIR模式

    从Spark 0.8版本开始,可以通过配置FAIR共享调度模式调度Job,如图4-7所示。在

    FAIR共享模式调度下,Spark在多Job之间以轮询(round robin)方式为任务分配资源,所有

    的任务拥有大致相当的优先级来共享集群的资源。这就意味着当一个长任务正在执行时,短

    任务仍可以分配到资源,提交并执行,并且获得不错的响应时间。这样就不用像以前一样需要等待长任务执行完才可以。这种调度模式很适合多用户的场景。用户可以通过配置

    spark.scheduler.mode方式来让应用以FAIR模式调度。FAIR调度器同样支持将Job分组加入

    调度池中调度,用户可以同时针对不同优先级对每个调度池配置不同的调度权重。这种方式

    允许更重要的Job配置在高优先级池中优先调度。这种方式借鉴了Hadoop的FAIR调度模型,如图4-7所示。

    如果读者对FAIR调度模式的源码感兴趣,可以参照FairSchedulingAlgorithm.scala源码

    了解,限于篇幅先不在这里介绍。

    在默认情况下,每个调度池拥有相同的优先级来共享整个集群的资源,同样default

    pool中的每个Job也拥有同样优先级进行资源共享,但是在用户创建的每个资源池中,Job是

    通过FIFO方式进行调度的。例如,如果每个用户都创建了一个调度池,这就意味着每个用户

    的调度池将会获得同样的优先级来共享整个集群,但是每个用户的调度池内部的请求是按照

    先进先出的方式调度的,后到的请求不能比先到的请求更早获得资源。

    在没有外部干预的情况下,新提交的任务放入default ......

您现在查看是摘要介绍页, 详见PDF附件(7270KB,397页)